refactor(core): Use an IoC container to manage singleton classes [Part-1] (no-changelog) (#5509)

* add typedi

* convert ActiveWorkflowRunner into an injectable service

* convert ExternalHooks into an injectable service

* convert InternalHooks into an injectable service

* convert LoadNodesAndCredentials into an injectable service

* convert NodeTypes and CredentialTypes into an injectable service

* convert ActiveExecutions into an injectable service

* convert WaitTracker into an injectable service

* convert Push into an injectable service

* convert ActiveWebhooks and  TestWebhooks into an injectable services

* handle circular references, and log errors when a circular dependency is found
This commit is contained in:
कारतोफ्फेलस्क्रिप्ट™ 2023-02-21 19:21:56 +01:00 committed by GitHub
parent aca94bb995
commit 52f740b9e8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
79 changed files with 594 additions and 634 deletions

View file

@ -79,7 +79,8 @@
"qqjs>globby": "^11.1.0"
},
"patchedDependencies": {
"element-ui@2.15.12": "patches/element-ui@2.15.12.patch"
"element-ui@2.15.12": "patches/element-ui@2.15.12.patch",
"typedi@0.10.0": "patches/typedi@0.10.0.patch"
}
}
}

View file

@ -34,6 +34,7 @@ process.env.OCLIF_TS_NODE = '0';
require('express-async-errors');
require('source-map-support').install();
require('reflect-metadata');
require('@oclif/command')
.run()

View file

@ -109,6 +109,7 @@
"mock-jwks": "^1.0.9",
"nodemon": "^2.0.2",
"run-script-os": "^1.0.7",
"ts-essentials": "^7.0.3",
"tsc-alias": "^1.8.2",
"tsconfig-paths": "^4.1.2"
},
@ -188,6 +189,7 @@
"posthog-node": "^2.2.2",
"prom-client": "^13.1.0",
"psl": "^1.8.0",
"reflect-metadata": "^0.1.13",
"replacestream": "^4.0.3",
"semver": "^7.3.8",
"shelljs": "^0.8.5",
@ -196,6 +198,7 @@
"sse-channel": "^4.0.0",
"swagger-ui-express": "^4.3.0",
"syslog-client": "^1.1.1",
"typedi": "^0.10.0",
"typeorm": "^0.3.12",
"uuid": "^8.3.2",
"validator": "13.7.0",

View file

@ -1,3 +1,4 @@
import { Container } from 'typedi';
import { readFile } from 'fs/promises';
import type { Server } from 'http';
import type { Url } from 'url';
@ -12,7 +13,7 @@ import type { WebhookHttpMethod } from 'n8n-workflow';
import { ErrorReporterProxy as ErrorReporter, LoggerProxy as Logger } from 'n8n-workflow';
import config from '@/config';
import { N8N_VERSION, inDevelopment } from '@/constants';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import type { IExternalHooksClass } from '@/Interfaces';
import { ExternalHooks } from '@/ExternalHooks';
@ -23,7 +24,7 @@ import {
ServiceUnavailableError,
} from '@/ResponseHelper';
import { corsMiddleware } from '@/middlewares';
import * as TestWebhooks from '@/TestWebhooks';
import { TestWebhooks } from '@/TestWebhooks';
import { WaitingWebhooks } from '@/WaitingWebhooks';
import { WEBHOOK_METHODS } from '@/WebhookHelpers';
@ -36,7 +37,7 @@ export abstract class AbstractServer {
protected externalHooks: IExternalHooksClass;
protected activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner;
protected activeWorkflowRunner: ActiveWorkflowRunner;
protected protocol: string;
@ -71,8 +72,8 @@ export abstract class AbstractServer {
this.endpointWebhookTest = config.getEnv('endpoints.webhookTest');
this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting');
this.externalHooks = ExternalHooks();
this.activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
this.externalHooks = Container.get(ExternalHooks);
this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
}
private async setupErrorHandlers() {
@ -338,7 +339,7 @@ export abstract class AbstractServer {
// ----------------------------------------
protected setupTestWebhookEndpoint() {
const endpoint = this.endpointWebhookTest;
const testWebhooks = TestWebhooks.getInstance();
const testWebhooks = Container.get(TestWebhooks);
// Register all test webhook requests (for testing via the UI)
this.app.all(`/${endpoint}/*`, async (req, res) => {

View file

@ -26,7 +26,9 @@ import type {
} from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { Service } from 'typedi';
@Service()
export class ActiveExecutions {
private activeExecutions: {
[index: string]: IExecutingWorkflowData;
@ -34,7 +36,6 @@ export class ActiveExecutions {
/**
* Add a new active execution
*
*/
async add(
executionData: IWorkflowExecutionDataProcess,
@ -253,13 +254,3 @@ export class ActiveExecutions {
return this.activeExecutions[executionId].status;
}
}
let activeExecutionsInstance: ActiveExecutions | undefined;
export function getInstance(): ActiveExecutions {
if (activeExecutionsInstance === undefined) {
activeExecutionsInstance = new ActiveExecutions();
}
return activeExecutionsInstance;
}

View file

@ -1,3 +1,4 @@
import { Service } from 'typedi';
import type {
IWebhookData,
WebhookHttpMethod,
@ -6,8 +7,9 @@ import type {
WorkflowExecuteMode,
} from 'n8n-workflow';
import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import * as NodeExecuteFunctions from 'n8n-core';
@Service()
export class ActiveWebhooks {
private workflowWebhooks: {
[key: string]: IWebhookData[];

View file

@ -8,6 +8,8 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { Container, Service } from 'typedi';
import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core';
import type {
@ -55,7 +57,7 @@ import config from '@/config';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { WebhookEntity } from '@db/entities/WebhookEntity';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import { createErrorExecution } from '@/GenericHelpers';
import { WORKFLOW_REACTIVATE_INITIAL_TIMEOUT, WORKFLOW_REACTIVATE_MAX_TIMEOUT } from '@/constants';
import { NodeTypes } from '@/NodeTypes';
@ -68,8 +70,9 @@ import { START_NODES } from './constants';
const WEBHOOK_PROD_UNREGISTERED_HINT =
"The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)";
@Service()
export class ActiveWorkflowRunner {
private activeWorkflows: ActiveWorkflows | null = null;
private activeWorkflows = new ActiveWorkflows();
private activationErrors: {
[key: string]: IActivationError;
@ -79,9 +82,7 @@ export class ActiveWorkflowRunner {
[key: string]: IQueuedWorkflowActivations;
} = {};
constructor() {
this.activeWorkflows = new ActiveWorkflows();
}
constructor(private externalHooks: ExternalHooks) {}
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
async init() {
@ -133,7 +134,7 @@ export class ActiveWorkflowRunner {
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
Logger.info(` ${error.message}`);
Logger.error(
`Issue on intital workflow activation try "${workflowData.name}" (startup)`,
`Issue on initial workflow activation try "${workflowData.name}" (startup)`,
{
workflowName: workflowData.name,
workflowId: workflowData.id,
@ -148,21 +149,18 @@ export class ActiveWorkflowRunner {
}
Logger.verbose('Finished initializing active workflows (startup)');
}
const externalHooks = ExternalHooks();
await externalHooks.run('activeWorkflows.initialized', []);
await this.externalHooks.run('activeWorkflows.initialized', []);
}
/**
* Removes all the currently active workflows
*
*/
async removeAll(): Promise<void> {
let activeWorkflowIds: string[] = [];
Logger.verbose('Call to remove all active workflows received (removeAll)');
if (this.activeWorkflows !== null) {
activeWorkflowIds.push.apply(activeWorkflowIds, this.activeWorkflows.allActiveWorkflows());
}
activeWorkflowIds.push.apply(activeWorkflowIds, this.activeWorkflows.allActiveWorkflows());
const activeWorkflows = await this.getActiveWorkflows();
activeWorkflowIds = [
@ -183,7 +181,6 @@ export class ActiveWorkflowRunner {
/**
* Checks if a webhook for the given method and path exists and executes the workflow.
*
*/
async executeWebhook(
httpMethod: WebhookHttpMethod,
@ -192,11 +189,6 @@ export class ActiveWorkflowRunner {
res: express.Response,
): Promise<IResponseCallbackData> {
Logger.debug(`Received webhook "${httpMethod}" for path "${path}"`);
if (this.activeWorkflows === null) {
throw new ResponseHelper.NotFoundError(
'The "activeWorkflows" instance did not get initialized yet.',
);
}
// Reset request parameters
req.params = {};
@ -279,7 +271,7 @@ export class ActiveWorkflowRunner {
);
}
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
const workflow = new Workflow({
id: webhook.workflowId,
name: workflowData.name,
@ -482,6 +474,7 @@ export class ActiveWorkflowRunner {
try {
await this.removeWorkflowWebhooks(workflow.id as string);
} catch (error) {
ErrorReporter.error(error);
Logger.error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Could not remove webhooks of workflow "${workflow.id}" because of error: "${error.message}"`,
@ -521,7 +514,7 @@ export class ActiveWorkflowRunner {
throw new Error(`Could not find workflow with id "${workflowId}"`);
}
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
const workflow = new Workflow({
id: workflowId,
name: workflowData.name,
@ -645,7 +638,7 @@ export class ActiveWorkflowRunner {
if (donePromise) {
executePromise.then((executionId) => {
ActiveExecutions.getInstance()
Container.get(ActiveExecutions)
.getPostExecutePromise(executionId)
.then(donePromise.resolve)
.catch(donePromise.reject);
@ -702,7 +695,7 @@ export class ActiveWorkflowRunner {
if (donePromise) {
executePromise.then((executionId) => {
ActiveExecutions.getInstance()
Container.get(ActiveExecutions)
.getPostExecutePromise(executionId)
.then(donePromise.resolve)
.catch(donePromise.reject);
@ -723,7 +716,7 @@ export class ActiveWorkflowRunner {
// Remove the workflow as "active"
await this.activeWorkflows?.remove(workflowData.id);
await this.activeWorkflows.remove(workflowData.id);
this.activationErrors[workflowData.id] = {
time: new Date().getTime(),
error: {
@ -777,10 +770,6 @@ export class ActiveWorkflowRunner {
activation: WorkflowActivateMode,
workflowData?: IWorkflowDb,
): Promise<void> {
if (this.activeWorkflows === null) {
throw new Error('The "activeWorkflows" instance did not get initialized yet.');
}
let workflowInstance: Workflow;
try {
if (workflowData === undefined) {
@ -793,7 +782,7 @@ export class ActiveWorkflowRunner {
if (!workflowData) {
throw new Error(`Could not find workflow with id "${workflowId}".`);
}
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
workflowInstance = new Workflow({
id: workflowId,
name: workflowData.name,
@ -978,47 +967,31 @@ export class ActiveWorkflowRunner {
*/
// TODO: this should happen in a transaction
async remove(workflowId: string): Promise<void> {
if (this.activeWorkflows !== null) {
// Remove all the webhooks of the workflow
try {
await this.removeWorkflowWebhooks(workflowId);
} catch (error) {
ErrorReporter.error(error);
Logger.error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Could not remove webhooks of workflow "${workflowId}" because of error: "${error.message}"`,
);
}
if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
delete this.activationErrors[workflowId];
}
if (this.queuedWorkflowActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId);
}
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if (this.activeWorkflows.isActive(workflowId)) {
await this.activeWorkflows.remove(workflowId);
Logger.verbose(`Successfully deactivated workflow "${workflowId}"`, { workflowId });
}
return;
// Remove all the webhooks of the workflow
try {
await this.removeWorkflowWebhooks(workflowId);
} catch (error) {
ErrorReporter.error(error);
Logger.error(
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
`Could not remove webhooks of workflow "${workflowId}" because of error: "${error.message}"`,
);
}
throw new Error('The "activeWorkflows" instance did not get initialized yet.');
if (this.activationErrors[workflowId] !== undefined) {
// If there were any activation errors delete them
delete this.activationErrors[workflowId];
}
if (this.queuedWorkflowActivations[workflowId] !== undefined) {
this.removeQueuedWorkflowActivation(workflowId);
}
// if it's active in memory then it's a trigger
// so remove from list of actives workflows
if (this.activeWorkflows.isActive(workflowId)) {
await this.activeWorkflows.remove(workflowId);
Logger.verbose(`Successfully deactivated workflow "${workflowId}"`, { workflowId });
}
}
}
let workflowRunnerInstance: ActiveWorkflowRunner | undefined;
export function getInstance(): ActiveWorkflowRunner {
if (workflowRunnerInstance === undefined) {
workflowRunnerInstance = new ActiveWorkflowRunner();
}
return workflowRunnerInstance;
}

View file

@ -1,14 +1,12 @@
import { loadClassInIsolation } from 'n8n-core';
import type {
ICredentialType,
ICredentialTypes,
INodesAndCredentials,
LoadedClass,
} from 'n8n-workflow';
import type { ICredentialType, ICredentialTypes, LoadedClass } from 'n8n-workflow';
import { Service } from 'typedi';
import { RESPONSE_ERROR_MESSAGES } from './constants';
import { LoadNodesAndCredentials } from './LoadNodesAndCredentials';
class CredentialTypesClass implements ICredentialTypes {
constructor(private nodesAndCredentials: INodesAndCredentials) {
@Service()
export class CredentialTypes implements ICredentialTypes {
constructor(private nodesAndCredentials: LoadNodesAndCredentials) {
nodesAndCredentials.credentialTypes = this;
}
@ -64,18 +62,3 @@ class CredentialTypesClass implements ICredentialTypes {
return this.nodesAndCredentials.known.credentials;
}
}
let credentialTypesInstance: CredentialTypesClass | undefined;
// eslint-disable-next-line @typescript-eslint/naming-convention
export function CredentialTypes(nodesAndCredentials?: INodesAndCredentials): CredentialTypesClass {
if (!credentialTypesInstance) {
if (nodesAndCredentials) {
credentialTypesInstance = new CredentialTypesClass(nodesAndCredentials);
} else {
throw new Error('CredentialTypes not initialized yet');
}
}
return credentialTypesInstance;
}

View file

@ -32,7 +32,6 @@ import type {
IHttpRequestHelper,
INodeTypeData,
INodeTypes,
ICredentialTypes,
} from 'n8n-workflow';
import {
ICredentialsHelper,
@ -54,6 +53,7 @@ import { CredentialTypes } from '@/CredentialTypes';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { whereClause } from './UserManagement/UserManagementHelper';
import { RESPONSE_ERROR_MESSAGES } from './constants';
import { Container } from 'typedi';
const mockNode = {
name: '',
@ -87,8 +87,8 @@ const mockNodeTypes: INodeTypes = {
export class CredentialsHelper extends ICredentialsHelper {
constructor(
encryptionKey: string,
private credentialTypes: ICredentialTypes = CredentialTypes(),
private nodeTypes: INodeTypes = NodeTypes(),
private credentialTypes = Container.get(CredentialTypes),
private nodeTypes = Container.get(NodeTypes),
) {
super(encryptionKey);
}

View file

@ -1,6 +1,7 @@
/* eslint-disable @typescript-eslint/no-var-requires */
/* eslint-disable import/no-dynamic-require */
/* eslint-disable no-restricted-syntax */
import { Service } from 'typedi';
import * as Db from '@/Db';
import type {
IExternalHooksClass,
@ -10,7 +11,8 @@ import type {
import config from '@/config';
class ExternalHooksClass implements IExternalHooksClass {
@Service()
export class ExternalHooks implements IExternalHooksClass {
externalHooks: {
[key: string]: Array<() => {}>;
} = {};
@ -103,14 +105,3 @@ class ExternalHooksClass implements IExternalHooksClass {
return !!this.externalHooks[hookName];
}
}
let externalHooksInstance: ExternalHooksClass | undefined;
// eslint-disable-next-line @typescript-eslint/naming-convention
export function ExternalHooks(): ExternalHooksClass {
if (externalHooksInstance === undefined) {
externalHooksInstance = new ExternalHooksClass();
}
return externalHooksInstance;
}

View file

@ -1,12 +1,12 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { Service } from 'typedi';
import { snakeCase } from 'change-case';
import { BinaryDataManager } from 'n8n-core';
import type {
ExecutionStatus,
INodesGraphResult,
INodeTypes,
IRun,
ITelemetryTrackProperties,
IWorkflowBase,
@ -22,13 +22,14 @@ import type {
IExecutionTrackProperties,
IWorkflowExecutionDataProcess,
} from '@/Interfaces';
import type { Telemetry } from '@/telemetry';
import { Telemetry } from '@/telemetry';
import type { AuthProviderType } from '@db/entities/AuthIdentity';
import { RoleService } from './role/role.service';
import { eventBus } from './eventbus';
import type { User } from '@db/entities/User';
import { N8N_VERSION } from '@/constants';
import * as Db from '@/Db';
import { NodeTypes } from './NodeTypes';
function userToPayload(user: User): {
userId: string;
@ -46,12 +47,17 @@ function userToPayload(user: User): {
};
}
export class InternalHooksClass implements IInternalHooksClass {
constructor(
private telemetry: Telemetry,
private instanceId: string,
private nodeTypes: INodeTypes,
) {}
@Service()
export class InternalHooks implements IInternalHooksClass {
private instanceId: string;
constructor(private telemetry: Telemetry, private nodeTypes: NodeTypes) {}
async init(instanceId: string) {
this.instanceId = instanceId;
this.telemetry.setInstanceId(instanceId);
await this.telemetry.init();
}
async onServerStarted(
diagnosticInfo: IDiagnosticInfo,

View file

@ -1,30 +0,0 @@
import type { INodeTypes } from 'n8n-workflow';
import { InternalHooksClass } from '@/InternalHooks';
import { Telemetry } from '@/telemetry';
import type { PostHogClient } from './posthog';
export class InternalHooksManager {
private static internalHooksInstance: InternalHooksClass;
static getInstance(): InternalHooksClass {
if (this.internalHooksInstance) {
return this.internalHooksInstance;
}
throw new Error('InternalHooks not initialized');
}
static async init(
instanceId: string,
nodeTypes: INodeTypes,
postHog: PostHogClient,
): Promise<InternalHooksClass> {
if (!this.internalHooksInstance) {
const telemetry = new Telemetry(instanceId, postHog);
await telemetry.init();
this.internalHooksInstance = new InternalHooksClass(telemetry, instanceId, nodeTypes);
}
return this.internalHooksInstance;
}
}

View file

@ -15,7 +15,8 @@ import {
import type { User } from '@db/entities/User';
import type { Role } from '@db/entities/Role';
import type { RunningMode, SyncStatus } from '@db/entities/AuthProviderSyncHistory';
import { InternalHooksManager } from '@/InternalHooksManager';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export class LdapSync {
private intervalId: NodeJS.Timeout | undefined = undefined;
@ -104,7 +105,7 @@ export class LdapSync {
);
if (usersToDisable.length) {
void InternalHooksManager.getInstance().onLdapUsersDisabled({
void Container.get(InternalHooks).onLdapUsersDisabled({
reason: 'ldap_update',
users: usersToDisable.length,
user_ids: usersToDisable,
@ -144,7 +145,7 @@ export class LdapSync {
error: errorMessage,
});
void InternalHooksManager.getInstance().onLdapSyncFinished({
void Container.get(InternalHooks).onLdapSyncFinished({
type: !this.intervalId ? 'scheduled' : `manual_${mode}`,
succeeded: true,
users_synced: usersToCreate.length + usersToUpdate.length + usersToDisable.length,

View file

@ -22,9 +22,10 @@ import {
LDAP_LOGIN_LABEL,
} from './constants';
import type { ConnectionSecurity, LdapConfig } from './types';
import { InternalHooksManager } from '@/InternalHooksManager';
import { jsonParse, LoggerProxy as Logger } from 'n8n-workflow';
import { getLicense } from '@/License';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
/**
* Check whether the LDAP feature is disabled in the instance
@ -162,7 +163,7 @@ export const updateLdapConfig = async (ldapConfig: LdapConfig): Promise<void> =>
const ldapUsers = await getLdapUsers();
if (ldapUsers.length) {
await deleteAllLdapIdentities();
void InternalHooksManager.getInstance().onLdapUsersDisabled({
void Container.get(InternalHooks).onLdapUsersDisabled({
reason: 'ldap_update',
users: ldapUsers.length,
user_ids: ldapUsers.map((user) => user.id),
@ -185,7 +186,7 @@ export const handleLdapInit = async (): Promise<void> => {
if (!isLdapEnabled()) {
const ldapUsers = await getLdapUsers();
if (ldapUsers.length) {
void InternalHooksManager.getInstance().onLdapUsersDisabled({
void Container.get(InternalHooks).onLdapUsersDisabled({
reason: 'ldap_feature_deactivated',
users: ldapUsers.length,
user_ids: ldapUsers.map((user) => user.id),
@ -238,7 +239,7 @@ export const findAndAuthenticateLdapUser = async (
);
} catch (e) {
if (e instanceof Error) {
void InternalHooksManager.getInstance().onLdapLoginSyncFailed({
void Container.get(InternalHooks).onLdapLoginSyncFailed({
error: e.message,
});
Logger.error('LDAP - Error during search', { message: e.message });

View file

@ -2,9 +2,10 @@ import express from 'express';
import { LdapManager } from '../LdapManager.ee';
import { getLdapConfig, getLdapSynchronizations, updateLdapConfig } from '../helpers';
import type { LdapConfiguration } from '../types';
import { InternalHooksManager } from '@/InternalHooksManager';
import pick from 'lodash.pick';
import { NON_SENSIBLE_LDAP_CONFIG_PROPERTIES } from '../constants';
import { InternalHooks } from '@/InternalHooks';
import { Container } from 'typedi';
export const ldapController = express.Router();
@ -42,7 +43,7 @@ ldapController.put('/config', async (req: LdapConfiguration.Update, res: express
const data = await getLdapConfig();
void InternalHooksManager.getInstance().onUserUpdatedLdapSettings({
void Container.get(InternalHooks).onUserUpdatedLdapSettings({
user_id: req.user.id,
...pick(data, NON_SENSIBLE_LDAP_CONFIG_PROPERTIES),
});

View file

@ -31,13 +31,11 @@ import {
CUSTOM_API_CALL_NAME,
inTest,
} from '@/constants';
import {
persistInstalledPackageData,
removePackageFromDatabase,
} from '@/CommunityNodes/packageModel';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { Service } from 'typedi';
export class LoadNodesAndCredentialsClass implements INodesAndCredentials {
@Service()
export class LoadNodesAndCredentials implements INodesAndCredentials {
known: KnownNodesAndCredentials = { nodes: {}, credentials: {} };
loaded: LoadedNodesAndCredentials = { nodes: {}, credentials: {} };
@ -202,6 +200,7 @@ export class LoadNodesAndCredentialsClass implements INodesAndCredentials {
if (loader.loadedNodes.length > 0) {
// Save info to DB
try {
const { persistInstalledPackageData } = await import('@/CommunityNodes/packageModel');
const installedPackage = await persistInstalledPackageData(loader);
await this.postProcessLoaders();
await this.generateTypesForFrontend();
@ -229,6 +228,7 @@ export class LoadNodesAndCredentialsClass implements INodesAndCredentials {
await executeCommand(command);
const { removePackageFromDatabase } = await import('@/CommunityNodes/packageModel');
await removePackageFromDatabase(installedPackage);
if (packageName in this.loaders) {
@ -264,6 +264,9 @@ export class LoadNodesAndCredentialsClass implements INodesAndCredentials {
if (loader.loadedNodes.length > 0) {
// Save info to DB
try {
const { persistInstalledPackageData, removePackageFromDatabase } = await import(
'@/CommunityNodes/packageModel'
);
await removePackageFromDatabase(installedPackage);
const newlyInstalledPackage = await persistInstalledPackageData(loader);
await this.postProcessLoaders();
@ -420,14 +423,3 @@ export class LoadNodesAndCredentialsClass implements INodesAndCredentials {
throw new Error('Could not find "node_modules" folder!');
}
}
let packagesInformationInstance: LoadNodesAndCredentialsClass | undefined;
// eslint-disable-next-line @typescript-eslint/naming-convention
export function LoadNodesAndCredentials(): LoadNodesAndCredentialsClass {
if (packagesInformationInstance === undefined) {
packagesInformationInstance = new LoadNodesAndCredentialsClass();
}
return packagesInformationInstance;
}

View file

@ -1,6 +1,5 @@
import { loadClassInIsolation } from 'n8n-core';
import type {
INodesAndCredentials,
INodeType,
INodeTypeDescription,
INodeTypes,
@ -8,10 +7,13 @@ import type {
LoadedClass,
} from 'n8n-workflow';
import { NodeHelpers } from 'n8n-workflow';
import { Service } from 'typedi';
import { RESPONSE_ERROR_MESSAGES } from './constants';
import { LoadNodesAndCredentials } from './LoadNodesAndCredentials';
export class NodeTypesClass implements INodeTypes {
constructor(private nodesAndCredentials: INodesAndCredentials) {
@Service()
export class NodeTypes implements INodeTypes {
constructor(private nodesAndCredentials: LoadNodesAndCredentials) {
// Some nodeTypes need to get special parameters applied like the
// polling nodes the polling times
this.applySpecialNodeParameters();
@ -75,18 +77,3 @@ export class NodeTypesClass implements INodeTypes {
return this.nodesAndCredentials.known.nodes;
}
}
let nodeTypesInstance: NodeTypesClass | undefined;
// eslint-disable-next-line @typescript-eslint/naming-convention
export function NodeTypes(nodesAndCredentials?: INodesAndCredentials): NodeTypesClass {
if (!nodeTypesInstance) {
if (nodesAndCredentials) {
nodeTypesInstance = new NodeTypesClass(nodesAndCredentials);
} else {
throw new Error('NodeTypes not initialized yet');
}
}
return nodeTypesInstance;
}

View file

@ -13,8 +13,9 @@ import type { JsonObject } from 'swagger-ui-express';
import config from '@/config';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import { getInstanceBaseUrl } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
async function createApiRouter(
version: string,
@ -100,7 +101,7 @@ async function createApiRouter(
if (!user) return false;
void InternalHooksManager.getInstance().onUserInvokedApi({
void Container.get(InternalHooks).onUserInvokedApi({
user_id: user.id,
path: req.path,
method: req.method,

View file

@ -19,6 +19,7 @@ import {
saveCredential,
toJsonSchema,
} from './credentials.service';
import { Container } from 'typedi';
export = {
createCredential: [
@ -87,7 +88,7 @@ export = {
const { credentialTypeName } = req.params;
try {
CredentialTypes().getByName(credentialTypeName);
Container.get(CredentialTypes).getByName(credentialTypeName);
} catch (error) {
return res.status(404).json({ message: 'Not Found' });
}

View file

@ -7,6 +7,7 @@ import { CredentialsHelper } from '@/CredentialsHelper';
import { CredentialTypes } from '@/CredentialTypes';
import type { CredentialRequest } from '../../../types';
import { toJsonSchema } from './credentials.service';
import { Container } from 'typedi';
export const validCredentialType = (
req: CredentialRequest.Create,
@ -14,7 +15,7 @@ export const validCredentialType = (
next: express.NextFunction,
): express.Response | void => {
try {
CredentialTypes().getByName(req.body.type);
Container.get(CredentialTypes).getByName(req.body.type);
} catch (_) {
return res.status(400).json({ message: 'req.body.type is not a known type' });
}

View file

@ -8,6 +8,7 @@ import type { User } from '@db/entities/User';
import { ExternalHooks } from '@/ExternalHooks';
import type { IDependency, IJsonSchema } from '../../../types';
import type { CredentialRequest } from '@/requests';
import { Container } from 'typedi';
export async function getCredentials(credentialId: string): Promise<ICredentialsDb | null> {
return Db.collections.Credentials.findOneBy({ id: credentialId });
@ -62,7 +63,7 @@ export async function saveCredential(
scope: 'credential',
});
await ExternalHooks().run('credentials.create', [encryptedData]);
await Container.get(ExternalHooks).run('credentials.create', [encryptedData]);
return Db.transaction(async (transactionManager) => {
const savedCredential = await transactionManager.save<CredentialsEntity>(credential);
@ -84,7 +85,7 @@ export async function saveCredential(
}
export async function removeCredential(credentials: CredentialsEntity): Promise<ICredentialsDb> {
await ExternalHooks().run('credentials.delete', [credentials.id]);
await Container.get(ExternalHooks).run('credentials.delete', [credentials.id]);
return Db.collections.Credentials.remove(credentials);
}

View file

@ -8,12 +8,13 @@ import {
deleteExecution,
getExecutionsCount,
} from './executions.service';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import { authorize, validCursor } from '../../shared/middlewares/global.middleware';
import type { ExecutionRequest } from '../../../types';
import { getSharedWorkflowIds } from '../workflows/workflows.service';
import { encodeNextCursor } from '../../shared/services/pagination.service';
import { InternalHooksManager } from '@/InternalHooksManager';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export = {
deleteExecution: [
@ -66,7 +67,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}
void InternalHooksManager.getInstance().onUserRetrievedExecution({
void Container.get(InternalHooks).onUserRetrievedExecution({
user_id: req.user.id,
public_api: true,
});
@ -95,7 +96,7 @@ export = {
}
// get running workflows so we exclude them from the result
const runningExecutionsIds = ActiveExecutions.getInstance()
const runningExecutionsIds = Container.get(ActiveExecutions)
.getActiveExecutions()
.map(({ id }) => id);
@ -116,7 +117,7 @@ export = {
const count = await getExecutionsCount(filters);
void InternalHooksManager.getInstance().onUserRetrievedAllExecutions({
void Container.get(InternalHooks).onUserRetrievedAllExecutions({
user_id: req.user.id,
public_api: true,
});

View file

@ -1,12 +1,11 @@
import type express from 'express';
import { Container } from 'typedi';
import type { FindManyOptions, FindOptionsWhere } from 'typeorm';
import { In } from 'typeorm';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import config from '@/config';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { InternalHooksManager } from '@/InternalHooksManager';
import { ExternalHooks } from '@/ExternalHooks';
import { addNodeIds, replaceInvalidCredentials } from '@/WorkflowHelpers';
import type { WorkflowRequest } from '../../../types';
@ -29,6 +28,7 @@ import {
parseTagNames,
} from './workflows.service';
import { WorkflowsService } from '@/workflows/workflows.services';
import { InternalHooks } from '@/InternalHooks';
export = {
createWorkflow: [
@ -50,8 +50,8 @@ export = {
const createdWorkflow = await createWorkflow(workflow, req.user, role);
await ExternalHooks().run('workflow.afterCreate', [createdWorkflow]);
void InternalHooksManager.getInstance().onWorkflowCreated(req.user, createdWorkflow, true);
await Container.get(ExternalHooks).run('workflow.afterCreate', [createdWorkflow]);
void Container.get(InternalHooks).onWorkflowCreated(req.user, createdWorkflow, true);
return res.json(createdWorkflow);
},
@ -84,7 +84,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}
void InternalHooksManager.getInstance().onUserRetrievedWorkflow({
void Container.get(InternalHooks).onUserRetrievedWorkflow({
user_id: req.user.id,
public_api: true,
});
@ -145,7 +145,7 @@ export = {
count = await getWorkflowsCount(query);
}
void InternalHooksManager.getInstance().onUserRetrievedAllWorkflows({
void Container.get(InternalHooks).onUserRetrievedAllWorkflows({
user_id: req.user.id,
public_api: true,
});
@ -182,7 +182,7 @@ export = {
await replaceInvalidCredentials(updateData);
addNodeIds(updateData);
const workflowRunner = ActiveWorkflowRunner.getInstance();
const workflowRunner = Container.get(ActiveWorkflowRunner);
if (sharedWorkflow.workflow.active) {
// When workflow gets saved always remove it as the triggers could have been
@ -210,8 +210,8 @@ export = {
const updatedWorkflow = await getWorkflowById(sharedWorkflow.workflowId);
await ExternalHooks().run('workflow.afterUpdate', [updateData]);
void InternalHooksManager.getInstance().onWorkflowSaved(req.user, updateData, true);
await Container.get(ExternalHooks).run('workflow.afterUpdate', [updateData]);
void Container.get(InternalHooks).onWorkflowSaved(req.user, updateData, true);
return res.json(updatedWorkflow);
},
@ -231,7 +231,7 @@ export = {
if (!sharedWorkflow.workflow.active) {
try {
await ActiveWorkflowRunner.getInstance().add(sharedWorkflow.workflowId, 'activate');
await Container.get(ActiveWorkflowRunner).add(sharedWorkflow.workflowId, 'activate');
} catch (error) {
if (error instanceof Error) {
return res.status(400).json({ message: error.message });
@ -263,7 +263,7 @@ export = {
return res.status(404).json({ message: 'Not Found' });
}
const workflowRunner = ActiveWorkflowRunner.getInstance();
const workflowRunner = Container.get(ActiveWorkflowRunner);
if (sharedWorkflow.workflow.active) {
await workflowRunner.remove(sharedWorkflow.workflowId);

View file

@ -2,8 +2,9 @@ import type Bull from 'bull';
import type { RedisOptions } from 'ioredis';
import type { IExecuteResponsePromiseData } from 'n8n-workflow';
import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as WebhookHelpers from '@/WebhookHelpers';
import { Container } from 'typedi';
export type JobId = Bull.JobId;
export type Job = Bull.Job<JobData>;
@ -26,7 +27,7 @@ export interface WebhookResponse {
export class Queue {
private jobQueue: JobQueue;
constructor(private activeExecutions: ActiveExecutions.ActiveExecutions) {}
constructor(private activeExecutions: ActiveExecutions) {}
async init() {
const prefix = config.getEnv('queue.bull.prefix');
@ -95,7 +96,7 @@ let activeQueueInstance: Queue | undefined;
export async function getInstance(): Promise<Queue> {
if (activeQueueInstance === undefined) {
activeQueueInstance = new Queue(ActiveExecutions.getInstance());
activeQueueInstance = new Queue(Container.get(ActiveExecutions));
await activeQueueInstance.init();
}

View file

@ -1,13 +1,13 @@
import path from 'path';
import { realpath, access } from 'fs/promises';
import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import type { NodeTypesClass } from '@/NodeTypes';
import type { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { NodeTypes } from '@/NodeTypes';
import type { Push } from '@/push';
export const reloadNodesAndCredentials = async (
loadNodesAndCredentials: LoadNodesAndCredentialsClass,
nodeTypes: NodeTypesClass,
loadNodesAndCredentials: LoadNodesAndCredentials,
nodeTypes: NodeTypes,
push: Push,
) => {
// eslint-disable-next-line import/no-extraneous-dependencies

View file

@ -57,7 +57,6 @@ import history from 'connect-history-api-fallback';
import config from '@/config';
import * as Queue from '@/Queue';
import { InternalHooksManager } from '@/InternalHooksManager';
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { nodesController } from '@/api/nodes.api';
@ -67,7 +66,6 @@ import {
GENERATED_STATIC_DIR,
inDevelopment,
N8N_VERSION,
NODES_BASE_DIR,
RESPONSE_ERROR_MESSAGES,
TEMPLATES_DIR,
} from '@/constants';
@ -114,7 +112,7 @@ import type {
IExecutionsStopData,
IN8nUISettings,
} from '@/Interfaces';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import {
CredentialsHelper,
getCredentialForUser,
@ -123,11 +121,8 @@ import {
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { CredentialTypes } from '@/CredentialTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import type { NodeTypesClass } from '@/NodeTypes';
import { NodeTypes } from '@/NodeTypes';
import * as ResponseHelper from '@/ResponseHelper';
import type { WaitTrackerClass } from '@/WaitTracker';
import { WaitTracker } from '@/WaitTracker';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
@ -136,8 +131,7 @@ import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { getLicense } from '@/License';
import { licenseController } from './license/license.controller';
import type { Push } from '@/push';
import { getPushInstance, setupPushServer, setupPushHandler } from '@/push';
import { Push, setupPushServer, setupPushHandler } from '@/push';
import { setupAuthMiddlewares } from './middlewares';
import { initEvents } from './events';
import { ldapController } from './Ldap/routes/ldap.controller.ee';
@ -149,23 +143,25 @@ import { setupExternalJWTAuth } from './middlewares/externalJWTAuth';
import { PostHogClient } from './posthog';
import { eventBus } from './eventbus';
import { isSamlEnabled } from './Saml/helpers';
import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks';
const exec = promisify(callbackExec);
class Server extends AbstractServer {
endpointPresetCredentials: string;
waitTracker: WaitTrackerClass;
waitTracker: WaitTracker;
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
activeExecutionsInstance: ActiveExecutions;
frontendSettings: IN8nUISettings;
presetCredentialsLoaded: boolean;
loadNodesAndCredentials: LoadNodesAndCredentialsClass;
loadNodesAndCredentials: LoadNodesAndCredentials;
nodeTypes: NodeTypesClass;
nodeTypes: NodeTypes;
credentialTypes: ICredentialTypes;
@ -176,18 +172,18 @@ class Server extends AbstractServer {
constructor() {
super();
this.nodeTypes = NodeTypes();
this.credentialTypes = CredentialTypes();
this.loadNodesAndCredentials = LoadNodesAndCredentials();
this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials);
this.credentialTypes = Container.get(CredentialTypes);
this.nodeTypes = Container.get(NodeTypes);
this.activeExecutionsInstance = ActiveExecutions.getInstance();
this.waitTracker = WaitTracker();
this.postHog = new PostHogClient();
this.activeExecutionsInstance = Container.get(ActiveExecutions);
this.waitTracker = Container.get(WaitTracker);
this.postHog = Container.get(PostHogClient);
this.presetCredentialsLoaded = false;
this.endpointPresetCredentials = config.getEnv('credentials.overwrite.endpoint');
this.push = getPushInstance();
this.push = Container.get(Push);
if (process.env.E2E_TESTS === 'true') {
this.app.use('/e2e', require('./api/e2e.api').e2eController);
@ -358,7 +354,7 @@ class Server extends AbstractServer {
setupAuthMiddlewares(app, ignoredEndpoints, this.restEndpoint, repositories.User);
const logger = LoggerProxy;
const internalHooks = InternalHooksManager.getInstance();
const internalHooks = Container.get(InternalHooks);
const mailer = getMailerInstance();
const postHog = this.postHog;
@ -1182,9 +1178,7 @@ class Server extends AbstractServer {
`/${this.restEndpoint}/settings`,
ResponseHelper.send(
async (req: express.Request, res: express.Response): Promise<IN8nUISettings> => {
void InternalHooksManager.getInstance().onFrontendSettingsAPI(
req.headers.sessionid as string,
);
void Container.get(InternalHooks).onFrontendSettingsAPI(req.headers.sessionid as string);
return this.getSettingsForFrontend();
},
@ -1355,6 +1349,6 @@ export async function start(): Promise<void> {
order: { createdAt: 'ASC' },
where: {},
}).then(async (workflow) =>
InternalHooksManager.getInstance().onServerStarted(diagnosticInfo, workflow?.createdAt),
Container.get(InternalHooks).onServerStarted(diagnosticInfo, workflow?.createdAt),
);
}

View file

@ -2,8 +2,7 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
/* eslint-disable no-param-reassign */
import type express from 'express';
import { ActiveWebhooks } from 'n8n-core';
import { Service } from 'typedi';
import type {
IWebhookData,
@ -13,16 +12,18 @@ import type {
WorkflowActivateMode,
WorkflowExecuteMode,
} from 'n8n-workflow';
import { ActiveWebhooks } from '@/ActiveWebhooks';
import type { IResponseCallbackData, IWorkflowDb } from '@/Interfaces';
import type { Push } from '@/push';
import { getPushInstance } from '@/push';
import { Push } from '@/push';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
const WEBHOOK_TEST_UNREGISTERED_HINT =
"Click the 'Execute workflow' button on the canvas, then try again. (In test mode, the webhook only works for one call after you click this button)";
class TestWebhooks {
@Service()
export class TestWebhooks {
private testWebhookData: {
[key: string]: {
sessionId?: string;
@ -286,13 +287,3 @@ class TestWebhooks {
return this.activeWebhooks.removeAll(workflows);
}
}
let testWebhooksInstance: TestWebhooks | undefined;
export function getInstance(): TestWebhooks {
if (testWebhooksInstance === undefined) {
testWebhooksInstance = new TestWebhooks(new ActiveWebhooks(), getPushInstance());
}
return testWebhooksInstance;
}

View file

@ -17,7 +17,7 @@ import { DateUtils } from 'typeorm/util/DateUtils';
import config from '@/config';
import * as Db from '@/Db';
import * as ResponseHelper from '@/ResponseHelper';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import type {
IExecutionFlattedDb,
IExecutionsStopData,
@ -25,9 +25,11 @@ import type {
} from '@/Interfaces';
import { WorkflowRunner } from '@/WorkflowRunner';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container, Service } from 'typedi';
export class WaitTrackerClass {
activeExecutionsInstance: ActiveExecutions.ActiveExecutions;
@Service()
export class WaitTracker {
activeExecutionsInstance: ActiveExecutions;
private waitingExecutions: {
[key: string]: {
@ -39,7 +41,7 @@ export class WaitTrackerClass {
mainTimer: NodeJS.Timeout;
constructor() {
this.activeExecutionsInstance = ActiveExecutions.getInstance();
this.activeExecutionsInstance = Container.get(ActiveExecutions);
// Poll every 60 seconds a list of upcoming executions
this.mainTimer = setInterval(() => {
@ -189,13 +191,3 @@ export class WaitTrackerClass {
});
}
}
let waitTrackerInstance: WaitTrackerClass | undefined;
export function WaitTracker(): WaitTrackerClass {
if (waitTrackerInstance === undefined) {
waitTrackerInstance = new WaitTrackerClass();
}
return waitTrackerInstance;
}

View file

@ -13,6 +13,7 @@ import { NodeTypes } from '@/NodeTypes';
import type { IExecutionResponse, IResponseCallbackData, IWorkflowDb } from '@/Interfaces';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi';
export class WaitingWebhooks {
async executeWebhook(
@ -78,7 +79,7 @@ export class WaitingWebhooks {
const { workflowData } = fullExecutionData;
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
const workflow = new Workflow({
id: workflowData.id!.toString(),
name: workflowData.name,

View file

@ -52,10 +52,11 @@ import * as ResponseHelper from '@/ResponseHelper';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { WorkflowRunner } from '@/WorkflowRunner';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi';
export const WEBHOOK_METHODS = ['DELETE', 'GET', 'HEAD', 'PATCH', 'POST', 'PUT'];
@ -460,7 +461,7 @@ export async function executeWebhook(
);
// Get a promise which resolves when the workflow did execute and send then response
const executePromise = ActiveExecutions.getInstance().getPostExecutePromise(
const executePromise = Container.get(ActiveExecutions).getPostExecutePromise(
executionId,
) as Promise<IExecutionDb | undefined>;
executePromise

View file

@ -48,7 +48,7 @@ import { LessThanOrEqual } from 'typeorm';
import { DateUtils } from 'typeorm/util/DateUtils';
import config from '@/config';
import * as Db from '@/Db';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import { CredentialsHelper } from '@/CredentialsHelper';
import { ExternalHooks } from '@/ExternalHooks';
import type {
@ -60,9 +60,8 @@ import type {
IWorkflowExecutionDataProcess,
IWorkflowErrorData,
} from '@/Interfaces';
import { InternalHooksManager } from '@/InternalHooksManager';
import { NodeTypes } from '@/NodeTypes';
import { getPushInstance } from '@/push';
import { Push } from '@/push';
import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers';
@ -70,6 +69,8 @@ import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { findSubworkflowStart } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowsService } from './workflows/workflows.services';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
@ -280,7 +281,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
pushInstance.send('nodeExecuteBefore', { executionId, nodeName }, sessionId);
},
],
@ -298,7 +299,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
workflowId: this.workflowData.id,
});
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
pushInstance.send('nodeExecuteAfter', { executionId, nodeName, data }, sessionId);
},
],
@ -315,7 +316,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
if (sessionId === undefined) {
return;
}
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
pushInstance.send(
'executionStarted',
{
@ -381,7 +382,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
retryOf,
};
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
pushInstance.send('executionFinished', sendData, sessionId);
},
],
@ -389,7 +390,7 @@ function hookFunctionsPush(): IWorkflowExecuteHooks {
}
export function hookFunctionsPreExecute(parentProcessMode?: string): IWorkflowExecuteHooks {
const externalHooks = ExternalHooks();
const externalHooks = Container.get(ExternalHooks);
return {
workflowExecuteBefore: [
@ -923,10 +924,10 @@ async function executeWorkflow(
parentWorkflowSettings?: IWorkflowSettings;
},
): Promise<Array<INodeExecutionData[] | null> | IWorkflowExecuteProcess> {
const externalHooks = ExternalHooks();
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
const workflowData =
options.loadedWorkflowData ??
@ -956,10 +957,10 @@ async function executeWorkflow(
executionId =
options.parentExecutionId !== undefined
? options.parentExecutionId
: await ActiveExecutions.getInstance().add(runData);
: await Container.get(ActiveExecutions).add(runData);
}
void InternalHooksManager.getInstance().onWorkflowBeforeExecute(executionId || '', runData);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId || '', runData);
let data;
try {
@ -1062,7 +1063,7 @@ async function executeWorkflow(
await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);
void InternalHooksManager.getInstance().onWorkflowPostExecute(
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId,
workflowData,
data,
@ -1072,11 +1073,11 @@ async function executeWorkflow(
if (data.finished === true) {
// Workflow did finish successfully
ActiveExecutions.getInstance().remove(executionId, data);
Container.get(ActiveExecutions).remove(executionId, data);
const returnData = WorkflowHelpers.getDataLastExecutedNodeData(data);
return returnData!.data!.main;
}
ActiveExecutions.getInstance().remove(executionId, data);
Container.get(ActiveExecutions).remove(executionId, data);
// Workflow did fail
const { error } = data.data.resultData;
// eslint-disable-next-line @typescript-eslint/no-throw-literal
@ -1092,7 +1093,7 @@ export function setExecutionStatus(status: ExecutionStatus) {
return;
}
Logger.debug(`Setting execution status for ${this.executionId} to "${status}"`);
ActiveExecutions.getInstance()
Container.get(ActiveExecutions)
.setStatus(this.executionId, status)
.catch((error) => {
Logger.debug(`Setting execution status "${status}" failed: ${error.message}`);
@ -1108,7 +1109,7 @@ export function sendMessageToUI(source: string, messages: any[]) {
// Push data to session which started workflow
try {
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
pushInstance.send(
'sendConsoleMessage',
{
@ -1229,7 +1230,7 @@ export function getWorkflowHooksWorkerMain(
this: WorkflowHooks,
nodeName: string,
): Promise<void> {
void InternalHooksManager.getInstance().onNodeBeforeExecute(
void Container.get(InternalHooks).onNodeBeforeExecute(
this.executionId,
this.workflowData,
nodeName,
@ -1239,7 +1240,7 @@ export function getWorkflowHooksWorkerMain(
this: WorkflowHooks,
nodeName: string,
): Promise<void> {
void InternalHooksManager.getInstance().onNodePostExecute(
void Container.get(InternalHooks).onNodePostExecute(
this.executionId,
this.workflowData,
nodeName,
@ -1281,7 +1282,7 @@ export function getWorkflowHooksMain(
this: WorkflowHooks,
nodeName: string,
): Promise<void> {
void InternalHooksManager.getInstance().onNodeBeforeExecute(
void Container.get(InternalHooks).onNodeBeforeExecute(
this.executionId,
this.workflowData,
nodeName,
@ -1292,7 +1293,7 @@ export function getWorkflowHooksMain(
this: WorkflowHooks,
nodeName: string,
): Promise<void> {
void InternalHooksManager.getInstance().onNodePostExecute(
void Container.get(InternalHooks).onNodePostExecute(
this.executionId,
this.workflowData,
nodeName,

View file

@ -32,6 +32,7 @@ import type { User } from '@db/entities/User';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import omit from 'lodash.omit';
import { PermissionChecker } from './UserManagement/PermissionChecker';
import { Container } from 'typedi';
const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');
@ -108,7 +109,7 @@ export async function executeErrorWorkflow(
}
const executionMode = 'error';
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
const workflowInstance = new Workflow({
id: workflowId,

View file

@ -33,7 +33,7 @@ import PCancelable from 'p-cancelable';
import { join as pathJoin } from 'path';
import { fork } from 'child_process';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
@ -49,25 +49,25 @@ import * as ResponseHelper from '@/ResponseHelper';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { InternalHooksManager } from '@/InternalHooksManager';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import type { Push } from '@/push';
import { getPushInstance } from '@/push';
import { Push } from '@/push';
import { eventBus } from './eventbus';
import { recoverExecutionDataFromEventLogMessages } from './eventbus/MessageEventBus/recoverEvents';
import { Container } from 'typedi';
import { InternalHooks } from './InternalHooks';
export class WorkflowRunner {
activeExecutions: ActiveExecutions.ActiveExecutions;
activeExecutions: ActiveExecutions;
push: Push;
jobQueue: Queue.JobQueue;
constructor() {
this.push = getPushInstance();
this.activeExecutions = ActiveExecutions.getInstance();
this.push = Container.get(Push);
this.activeExecutions = Container.get(ActiveExecutions);
}
/**
@ -130,7 +130,7 @@ export class WorkflowRunner {
const executionFlattedData = await Db.collections.Execution.findOneBy({ id: executionId });
void InternalHooksManager.getInstance().onWorkflowCrashed(
void Container.get(InternalHooks).onWorkflowCrashed(
executionId,
executionMode,
executionFlattedData?.workflowData,
@ -187,14 +187,14 @@ export class WorkflowRunner {
executionId = await this.runSubprocess(data, loadStaticData, executionId, responsePromise);
}
void InternalHooksManager.getInstance().onWorkflowBeforeExecute(executionId, data);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId, data);
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
const externalHooks = ExternalHooks();
const externalHooks = Container.get(ExternalHooks);
postExecutePromise
.then(async (executionData) => {
void InternalHooksManager.getInstance().onWorkflowPostExecute(
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId!,
data.workflowData,
executionData,
@ -241,7 +241,7 @@ export class WorkflowRunner {
data.workflowData.staticData = await WorkflowHelpers.getStaticDataById(workflowId);
}
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
// Soft timeout to stop workflow execution after current running node
// Changes were made by adding the `workflowTimeout` to the `additionalData`

View file

@ -7,6 +7,8 @@
/* eslint-disable @typescript-eslint/no-use-before-define */
/* eslint-disable @typescript-eslint/unbound-method */
import 'source-map-support/register';
import 'reflect-metadata';
import { Container } from 'typedi';
import type { IProcessMessage } from 'n8n-core';
import { BinaryDataManager, UserSettings, WorkflowExecute } from 'n8n-core';
@ -49,11 +51,11 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import { getLogger } from '@/Logger';
import config from '@/config';
import { InternalHooksManager } from '@/InternalHooksManager';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { initErrorHandling } from '@/ErrorReporting';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { getLicense } from './License';
import { InternalHooks } from './InternalHooks';
import { PostHogClient } from './posthog';
class WorkflowRunnerProcess {
@ -104,23 +106,20 @@ class WorkflowRunnerProcess {
const userSettings = await UserSettings.prepareUserSettings();
const loadNodesAndCredentials = LoadNodesAndCredentials();
const loadNodesAndCredentials = Container.get(LoadNodesAndCredentials);
await loadNodesAndCredentials.init();
const nodeTypes = NodeTypes(loadNodesAndCredentials);
const credentialTypes = CredentialTypes(loadNodesAndCredentials);
const nodeTypes = Container.get(NodeTypes);
const credentialTypes = Container.get(CredentialTypes);
CredentialsOverwrites(credentialTypes);
// Load all external hooks
const externalHooks = ExternalHooks();
const externalHooks = Container.get(ExternalHooks);
await externalHooks.init();
const instanceId = userSettings.instanceId ?? '';
const postHog = new PostHogClient();
await postHog.init(instanceId);
await InternalHooksManager.init(instanceId, nodeTypes, postHog);
await Container.get(PostHogClient).init(instanceId);
await Container.get(InternalHooks).init(instanceId);
const binaryDataConfig = config.getEnv('binaryDataManager');
await BinaryDataManager.init(binaryDataConfig);
@ -234,7 +233,7 @@ class WorkflowRunnerProcess {
};
});
void InternalHooksManager.getInstance().onWorkflowBeforeExecute(executionId || '', runData);
void Container.get(InternalHooks).onWorkflowBeforeExecute(executionId || '', runData);
let result: IRun;
try {
@ -255,7 +254,7 @@ class WorkflowRunnerProcess {
const { workflow } = executeWorkflowFunctionOutput;
result = await workflowExecute.processRunExecutionData(workflow);
await externalHooks.run('workflow.postExecute', [result, workflowData, executionId]);
void InternalHooksManager.getInstance().onWorkflowPostExecute(
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId,
workflowData,
result,
@ -513,6 +512,8 @@ process.on('message', async (message: IProcessMessage) => {
workflowRunner.executionIdCallback(message.data.executionId);
}
} catch (error) {
workflowRunner.logger.error(error.message);
// Catch all uncaught errors and forward them to parent process
const executionError = {
...error,

View file

@ -8,6 +8,7 @@ import config from '@/config';
import { NodeTypes } from '@/NodeTypes';
import * as ResponseHelper from '@/ResponseHelper';
import { getNodeTranslationPath } from '@/TranslationHelpers';
import { Container } from 'typedi';
export const nodeTypesController = express.Router();
@ -21,7 +22,7 @@ nodeTypesController.post(
if (defaultLocale === 'en') {
return nodeInfos.reduce<INodeTypeDescription[]>((acc, { name, version }) => {
const { description } = NodeTypes().getByNameAndVersion(name, version);
const { description } = Container.get(NodeTypes).getByNameAndVersion(name, version);
acc.push(description);
return acc;
}, []);
@ -32,7 +33,7 @@ nodeTypesController.post(
version: number,
nodeTypes: INodeTypeDescription[],
) {
const { description, sourcePath } = NodeTypes().getWithSourcePath(name, version);
const { description, sourcePath } = Container.get(NodeTypes).getWithSourcePath(name, version);
const translationPath = await getNodeTranslationPath({
nodeSourcePath: sourcePath,
longNodeType: description.name,

View file

@ -2,7 +2,6 @@ import express from 'express';
import type { PublicInstalledPackage } from 'n8n-workflow';
import config from '@/config';
import { InternalHooksManager } from '@/InternalHooksManager';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import * as ResponseHelper from '@/ResponseHelper';
@ -33,7 +32,9 @@ import { isAuthenticatedRequest } from '@/UserManagement/UserManagementHelper';
import type { InstalledPackages } from '@db/entities/InstalledPackages';
import type { CommunityPackages } from '@/Interfaces';
import type { NodeRequest } from '@/requests';
import { getPushInstance } from '@/push';
import { Push } from '@/push';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
const { PACKAGE_NOT_INSTALLED, PACKAGE_NAME_NOT_PROVIDED } = RESPONSE_ERROR_MESSAGES;
@ -116,14 +117,14 @@ nodesController.post(
let installedPackage: InstalledPackages;
try {
installedPackage = await LoadNodesAndCredentials().loadNpmModule(
installedPackage = await Container.get(LoadNodesAndCredentials).loadNpmModule(
parsed.packageName,
parsed.version,
);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : UNKNOWN_FAILURE_REASON;
void InternalHooksManager.getInstance().onCommunityPackageInstallFinished({
void Container.get(InternalHooks).onCommunityPackageInstallFinished({
user: req.user,
input_string: name,
package_name: parsed.packageName,
@ -141,7 +142,7 @@ nodesController.post(
if (!hasLoaded) removePackageFromMissingList(name);
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
@ -151,7 +152,7 @@ nodesController.post(
});
});
void InternalHooksManager.getInstance().onCommunityPackageInstallFinished({
void Container.get(InternalHooks).onCommunityPackageInstallFinished({
user: req.user,
input_string: name,
package_name: parsed.packageName,
@ -238,7 +239,7 @@ nodesController.delete(
}
try {
await LoadNodesAndCredentials().removeNpmModule(name, installedPackage);
await Container.get(LoadNodesAndCredentials).removeNpmModule(name, installedPackage);
} catch (error) {
const message = [
`Error removing package "${name}"`,
@ -248,7 +249,7 @@ nodesController.delete(
throw new ResponseHelper.InternalServerError(message);
}
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
// broadcast to connected frontends that node list has been updated
installedPackage.installedNodes.forEach((node) => {
@ -258,7 +259,7 @@ nodesController.delete(
});
});
void InternalHooksManager.getInstance().onCommunityPackageDeleteFinished({
void Container.get(InternalHooks).onCommunityPackageDeleteFinished({
user: req.user,
package_name: name,
package_version: installedPackage.installedVersion,
@ -290,12 +291,12 @@ nodesController.patch(
}
try {
const newInstalledPackage = await LoadNodesAndCredentials().updateNpmModule(
const newInstalledPackage = await Container.get(LoadNodesAndCredentials).updateNpmModule(
parseNpmPackageName(name).packageName,
previouslyInstalledPackage,
);
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
// broadcast to connected frontends that node list has been updated
previouslyInstalledPackage.installedNodes.forEach((node) => {
@ -312,7 +313,7 @@ nodesController.patch(
});
});
void InternalHooksManager.getInstance().onCommunityPackageUpdateFinished({
void Container.get(InternalHooks).onCommunityPackageUpdateFinished({
user: req.user,
package_name: name,
package_version_current: previouslyInstalledPackage.installedVersion,
@ -325,7 +326,7 @@ nodesController.patch(
return newInstalledPackage;
} catch (error) {
previouslyInstalledPackage.installedNodes.forEach((node) => {
const pushInstance = getPushInstance();
const pushInstance = Container.get(Push);
pushInstance.send('removeNodeType', {
name: node.type,
version: node.latestVersion,

View file

@ -9,15 +9,14 @@ import express from 'express';
import * as Db from '@/Db';
import { ExternalHooks } from '@/ExternalHooks';
import type { IExternalHooksClass, ITagWithCountDb } from '@/Interfaces';
import type { ITagWithCountDb } from '@/Interfaces';
import * as ResponseHelper from '@/ResponseHelper';
import config from '@/config';
import * as TagHelpers from '@/TagHelpers';
import { validateEntity } from '@/GenericHelpers';
import { TagEntity } from '@db/entities/TagEntity';
import type { TagsRequest } from '@/requests';
export const externalHooks: IExternalHooksClass = ExternalHooks();
import { Container } from 'typedi';
export const tagsController = express.Router();
@ -50,12 +49,12 @@ tagsController.post(
const newTag = new TagEntity();
newTag.name = req.body.name.trim();
await externalHooks.run('tag.beforeCreate', [newTag]);
await Container.get(ExternalHooks).run('tag.beforeCreate', [newTag]);
await validateEntity(newTag);
const tag = await Db.collections.Tag.save(newTag);
await externalHooks.run('tag.afterCreate', [tag]);
await Container.get(ExternalHooks).run('tag.afterCreate', [tag]);
return tag;
}),
@ -74,12 +73,12 @@ tagsController.patch(
newTag.id = id;
newTag.name = name.trim();
await externalHooks.run('tag.beforeUpdate', [newTag]);
await Container.get(ExternalHooks).run('tag.beforeUpdate', [newTag]);
await validateEntity(newTag);
const tag = await Db.collections.Tag.save(newTag);
await externalHooks.run('tag.afterUpdate', [tag]);
await Container.get(ExternalHooks).run('tag.afterUpdate', [tag]);
return tag;
}),
@ -100,11 +99,11 @@ tagsController.delete(
}
const id = req.params.id;
await externalHooks.run('tag.beforeDelete', [id]);
await Container.get(ExternalHooks).run('tag.beforeDelete', [id]);
await Db.collections.Tag.delete({ id });
await externalHooks.run('tag.afterDelete', [id]);
await Container.get(ExternalHooks).run('tag.afterDelete', [id]);
return true;
}),

View file

@ -12,6 +12,7 @@ import {
} from '@/audit/constants';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { Risk } from '@/audit/types';
import { Container } from 'typedi';
async function getCommunityNodeDetails() {
const installedPackages = await getAllInstalledPackages();
@ -32,7 +33,8 @@ async function getCommunityNodeDetails() {
async function getCustomNodeDetails() {
const customNodeTypes: Risk.CustomNodeDetails[] = [];
for (const customDir of LoadNodesAndCredentials().getCustomDirectories()) {
const nodesAndCredentials = Container.get(LoadNodesAndCredentials);
for (const customDir of nodesAndCredentials.getCustomDirectories()) {
const customNodeFiles = await glob('**/*.node.js', { cwd: customDir, absolute: true });
for (const nodeFile of customNodeFiles) {

View file

@ -1,8 +1,9 @@
import * as Db from '@/Db';
import type { User } from '@db/entities/User';
import { compareHash } from '@/UserManagement/UserManagementHelper';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as ResponseHelper from '@/ResponseHelper';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export const handleEmailLogin = async (
email: string,
@ -21,7 +22,7 @@ export const handleEmailLogin = async (
// so suggest to reset the password to gain access to the instance.
const ldapIdentity = user?.authIdentities?.find((i) => i.providerType === 'ldap');
if (user && ldapIdentity) {
void InternalHooksManager.getInstance().userLoginFailedDueToLdapDisabled({
void Container.get(InternalHooks).userLoginFailedDueToLdapDisabled({
user_id: user.id,
});

View file

@ -1,4 +1,4 @@
import { InternalHooksManager } from '@/InternalHooksManager';
import { InternalHooks } from '@/InternalHooks';
import {
createLdapUserOnLocalDb,
findAndAuthenticateLdapUser,
@ -12,6 +12,7 @@ import {
updateLdapUserOnLocalDb,
} from '@/Ldap/helpers';
import type { User } from '@db/entities/User';
import { Container } from 'typedi';
export const handleLdapLogin = async (
loginId: string,
@ -51,7 +52,7 @@ export const handleLdapLogin = async (
} else {
const role = await getLdapUserRole();
const user = await createLdapUserOnLocalDb(role, ldapAttributesValues, ldapId);
void InternalHooksManager.getInstance().onUserSignup(user, {
void Container.get(InternalHooks).onUserSignup(user, {
user_type: 'ldap',
was_disabled_ldap_user: false,
});

View file

@ -1,5 +1,6 @@
import { Command } from '@oclif/command';
import { ExitError } from '@oclif/errors';
import { Container } from 'typedi';
import type { INodeTypes } from 'n8n-workflow';
import { LoggerProxy, ErrorReporterProxy as ErrorReporter, sleep } from 'n8n-workflow';
import type { IUserSettings } from 'n8n-core';
@ -11,13 +12,12 @@ import * as CrashJournal from '@/CrashJournal';
import { inTest } from '@/constants';
import { CredentialTypes } from '@/CredentialTypes';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
import { InternalHooksManager } from '@/InternalHooksManager';
import { initErrorHandling } from '@/ErrorReporting';
import { ExternalHooks } from '@/ExternalHooks';
import { NodeTypes } from '@/NodeTypes';
import type { LoadNodesAndCredentialsClass } from '@/LoadNodesAndCredentials';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import type { IExternalHooksClass } from '@/Interfaces';
import { InternalHooks } from '@/InternalHooks';
import { PostHogClient } from '@/posthog';
export const UM_FIX_INSTRUCTION =
@ -28,7 +28,7 @@ export abstract class BaseCommand extends Command {
protected externalHooks: IExternalHooksClass;
protected loadNodesAndCredentials: LoadNodesAndCredentialsClass;
protected loadNodesAndCredentials: LoadNodesAndCredentials;
protected nodeTypes: INodeTypes;
@ -43,17 +43,15 @@ export abstract class BaseCommand extends Command {
// Make sure the settings exist
this.userSettings = await UserSettings.prepareUserSettings();
this.loadNodesAndCredentials = LoadNodesAndCredentials();
this.loadNodesAndCredentials = Container.get(LoadNodesAndCredentials);
await this.loadNodesAndCredentials.init();
this.nodeTypes = NodeTypes(this.loadNodesAndCredentials);
const credentialTypes = CredentialTypes(this.loadNodesAndCredentials);
this.nodeTypes = Container.get(NodeTypes);
const credentialTypes = Container.get(CredentialTypes);
CredentialsOverwrites(credentialTypes);
const instanceId = this.userSettings.instanceId ?? '';
const postHog = new PostHogClient();
await postHog.init(instanceId);
await InternalHooksManager.init(instanceId, this.nodeTypes, postHog);
await Container.get(PostHogClient).init(instanceId);
await Container.get(InternalHooks).init(instanceId);
await Db.init().catch(async (error: Error) =>
this.exitWithCrash('There was an error initializing DB', error),
@ -88,7 +86,7 @@ export abstract class BaseCommand extends Command {
}
protected async initExternalHooks() {
this.externalHooks = ExternalHooks();
this.externalHooks = Container.get(ExternalHooks);
await this.externalHooks.init();
}

View file

@ -1,10 +1,11 @@
import { flags } from '@oclif/command';
import { audit } from '@/audit';
import { RISK_CATEGORIES } from '@/audit/constants';
import { InternalHooksManager } from '@/InternalHooksManager';
import config from '@/config';
import type { Risk } from '@/audit/types';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export class SecurityAudit extends BaseCommand {
static description = 'Generate a security audit report for this n8n instance';
@ -56,7 +57,7 @@ export class SecurityAudit extends BaseCommand {
process.stdout.write(JSON.stringify(result, null, 2));
}
void InternalHooksManager.getInstance().onAuditGeneratedViaCli();
void Container.get(InternalHooks).onAuditGeneratedViaCli();
}
async catch(error: Error) {

View file

@ -4,7 +4,7 @@ import { PLACEHOLDER_EMPTY_WORKFLOW_ID } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import { ExecutionBaseError } from 'n8n-workflow';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as Db from '@/Db';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { WorkflowRunner } from '@/WorkflowRunner';
@ -13,6 +13,7 @@ import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
export class Execute extends BaseCommand {
static description = '\nExecutes a given workflow';
@ -117,7 +118,7 @@ export class Execute extends BaseCommand {
const workflowRunner = new WorkflowRunner();
const executionId = await workflowRunner.run(runData);
const activeExecutions = ActiveExecutions.getInstance();
const activeExecutions = Container.get(ActiveExecutions);
const data = await activeExecutions.getPostExecutePromise(executionId);
if (data === undefined) {

View file

@ -7,7 +7,7 @@ import { sep } from 'path';
import { diff } from 'json-diff';
import pick from 'lodash.pick';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import * as Db from '@/Db';
import { WorkflowRunner } from '@/WorkflowRunner';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
@ -16,6 +16,7 @@ import { getInstanceOwner } from '@/UserManagement/UserManagementHelper';
import { findCliWorkflowStart } from '@/utils';
import { initEvents } from '@/events';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
const re = /\d+/;
@ -101,7 +102,7 @@ export class ExecuteBatch extends BaseCommand {
}
ExecuteBatch.cancelled = true;
const activeExecutionsInstance = ActiveExecutions.getInstance();
const activeExecutionsInstance = Container.get(ActiveExecutions);
const stopPromises = activeExecutionsInstance
.getActiveExecutions()
.map(async (execution) => activeExecutionsInstance.stopExecution(execution.id));
@ -597,7 +598,7 @@ export class ExecuteBatch extends BaseCommand {
const workflowRunner = new WorkflowRunner();
const executionId = await workflowRunner.run(runData);
const activeExecutions = ActiveExecutions.getInstance();
const activeExecutions = Container.get(ActiveExecutions);
const data = await activeExecutions.getPostExecutePromise(executionId);
if (gotCancel || ExecuteBatch.cancelled) {
clearTimeout(timeoutTimer);

View file

@ -1,6 +1,7 @@
/* eslint-disable @typescript-eslint/await-thenable */
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import { Container } from 'typedi';
import path from 'path';
import { mkdir } from 'fs/promises';
import { createReadStream, createWriteStream, existsSync } from 'fs';
@ -16,26 +17,23 @@ import { LoggerProxy, sleep, jsonParse } from 'n8n-workflow';
import { createHash } from 'crypto';
import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import * as GenericHelpers from '@/GenericHelpers';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as Server from '@/Server';
import * as TestWebhooks from '@/TestWebhooks';
import { WaitTracker } from '@/WaitTracker';
import { TestWebhooks } from '@/TestWebhooks';
import { getAllInstalledPackages } from '@/CommunityNodes/packageModel';
import { handleLdapInit } from '@/Ldap/helpers';
import { EDITOR_UI_DIST_DIR, GENERATED_STATIC_DIR } from '@/constants';
import { eventBus } from '@/eventbus';
import { BaseCommand } from './BaseCommand';
import { InternalHooks } from '@/InternalHooks';
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-var-requires
const open = require('open');
const pipeline = promisify(stream.pipeline);
let activeWorkflowRunner: ActiveWorkflowRunner.ActiveWorkflowRunner | undefined;
export class Start extends BaseCommand {
static description = 'Starts n8n. Makes Web-UI available and starts active workflows';
@ -62,6 +60,8 @@ export class Start extends BaseCommand {
}),
};
protected activeWorkflowRunner = Container.get(ActiveWorkflowRunner);
/**
* Opens the UI in browser
*/
@ -86,7 +86,7 @@ export class Start extends BaseCommand {
try {
// Stop with trying to activate workflows that could not be activated
activeWorkflowRunner?.removeAllQueuedWorkflowActivations();
this.activeWorkflowRunner.removeAllQueuedWorkflowActivations();
await this.externalHooks.run('n8n.stop', []);
@ -97,25 +97,25 @@ export class Start extends BaseCommand {
await this.exitSuccessFully();
}, 30000);
await InternalHooksManager.getInstance().onN8nStop();
await Container.get(InternalHooks).onN8nStop();
const skipWebhookDeregistration = config.getEnv(
'endpoints.skipWebhooksDeregistrationOnShutdown',
);
const removePromises = [];
if (activeWorkflowRunner !== undefined && !skipWebhookDeregistration) {
removePromises.push(activeWorkflowRunner.removeAll());
if (!skipWebhookDeregistration) {
removePromises.push(this.activeWorkflowRunner.removeAll());
}
// Remove all test webhooks
const testWebhooks = TestWebhooks.getInstance();
const testWebhooks = Container.get(TestWebhooks);
removePromises.push(testWebhooks.removeAll());
await Promise.all(removePromises);
// Wait for active workflow executions to finish
const activeExecutionsInstance = ActiveExecutions.getInstance();
const activeExecutionsInstance = Container.get(ActiveExecutions);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
let count = 0;
@ -329,10 +329,7 @@ export class Start extends BaseCommand {
await Server.start();
// Start to get active workflows and run their triggers
activeWorkflowRunner = ActiveWorkflowRunner.getInstance();
await activeWorkflowRunner.init();
WaitTracker();
await this.activeWorkflowRunner.init();
await handleLdapInit();
@ -378,6 +375,7 @@ export class Start extends BaseCommand {
}
async catch(error: Error) {
console.log(error.stack);
await this.exitWithCrash('Exiting due to an error.', error);
}
}

View file

@ -1,9 +1,10 @@
import { flags } from '@oclif/command';
import { LoggerProxy, sleep } from 'n8n-workflow';
import config from '@/config';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import { WebhookServer } from '@/WebhookServer';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
export class Webhook extends BaseCommand {
static description = 'Starts n8n webhook process. Intercepts only production URLs.';
@ -32,7 +33,7 @@ export class Webhook extends BaseCommand {
}, 30000);
// Wait for active workflow executions to finish
const activeExecutionsInstance = ActiveExecutions.getInstance();
const activeExecutionsInstance = Container.get(ActiveExecutions);
let executingWorkflows = activeExecutionsInstance.getActiveExecutions();
let count = 0;

View file

@ -2,7 +2,6 @@ import express from 'express';
import type { INodeCredentialTestResult } from 'n8n-workflow';
import { deepCopy, LoggerProxy } from 'n8n-workflow';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as ResponseHelper from '@/ResponseHelper';
import type { CredentialsEntity } from '@db/entities/CredentialsEntity';
@ -10,6 +9,8 @@ import type { CredentialRequest } from '@/requests';
import { isSharingEnabled, rightDiff } from '@/UserManagement/UserManagementHelper';
import { EECredentialsService as EECredentials } from './credentials.service.ee';
import type { CredentialWithSharings } from './credentials.types';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
// eslint-disable-next-line @typescript-eslint/naming-convention
export const EECredentialsController = express.Router();
@ -174,7 +175,7 @@ EECredentialsController.put(
}
});
void InternalHooksManager.getInstance().onUserSharedCredentials({
void Container.get(InternalHooks).onUserSharedCredentials({
user: req.user,
credential_name: credential.name,
credential_type: credential.type,

View file

@ -5,7 +5,6 @@ import type { INodeCredentialTestResult } from 'n8n-workflow';
import { deepCopy, LoggerProxy } from 'n8n-workflow';
import * as GenericHelpers from '@/GenericHelpers';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as ResponseHelper from '@/ResponseHelper';
import config from '@/config';
import { getLogger } from '@/Logger';
@ -14,6 +13,8 @@ import { CredentialsService } from './credentials.service';
import type { ICredentialsDb } from '@/Interfaces';
import type { CredentialRequest } from '@/requests';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export const credentialsController = express.Router();
@ -130,7 +131,7 @@ credentialsController.post(
const encryptedData = CredentialsService.createEncryptedData(key, null, newCredential);
const credential = await CredentialsService.save(newCredential, encryptedData, req.user);
void InternalHooksManager.getInstance().onUserCreatedCredentials({
void Container.get(InternalHooks).onUserCreatedCredentials({
user: req.user,
credential_name: newCredential.name,
credential_type: credential.type,

View file

@ -24,6 +24,7 @@ import { ExternalHooks } from '@/ExternalHooks';
import type { User } from '@db/entities/User';
import type { CredentialRequest } from '@/requests';
import { CredentialTypes } from '@/CredentialTypes';
import { Container } from 'typedi';
export class CredentialsService {
static async get(
@ -205,7 +206,7 @@ export class CredentialsService {
credentialId: string,
newCredentialData: ICredentialsDb,
): Promise<ICredentialsDb | null> {
await ExternalHooks().run('credentials.update', [newCredentialData]);
await Container.get(ExternalHooks).run('credentials.update', [newCredentialData]);
// Update the credentials in DB
await Db.collections.Credentials.update(credentialId, newCredentialData);
@ -224,7 +225,7 @@ export class CredentialsService {
const newCredential = new CredentialsEntity();
Object.assign(newCredential, credential, encryptedData);
await ExternalHooks().run('credentials.create', [encryptedData]);
await Container.get(ExternalHooks).run('credentials.create', [encryptedData]);
const role = await Db.collections.Role.findOneByOrFail({
name: 'owner',
@ -256,7 +257,7 @@ export class CredentialsService {
}
static async delete(credentials: CredentialsEntity): Promise<void> {
await ExternalHooks().run('credentials.delete', [credentials.id]);
await Container.get(ExternalHooks).run('credentials.delete', [credentials.id]);
await Db.collections.Credentials.remove(credentials);
}
@ -279,7 +280,7 @@ export class CredentialsService {
): ICredentialDataDecryptedObject {
const copiedData = deepCopy(data);
const credTypes = CredentialTypes();
const credTypes = Container.get(CredentialTypes);
let credType: ICredentialType;
try {
credType = credTypes.getByName(credential.type);

View file

@ -30,6 +30,7 @@ import type { OAuthRequest } from '@/requests';
import { ExternalHooks } from '@/ExternalHooks';
import config from '@/config';
import { getInstanceBaseUrl } from '@/UserManagement/UserManagementHelper';
import { Container } from 'typedi';
export const oauth2CredentialController = express.Router();
@ -129,7 +130,7 @@ oauth2CredentialController.get(
state: stateEncodedStr,
};
await ExternalHooks().run('oauth2.authenticate', [oAuthOptions]);
await Container.get(ExternalHooks).run('oauth2.authenticate', [oAuthOptions]);
const oAuthObj = new ClientOAuth2(oAuthOptions);
@ -281,7 +282,7 @@ oauth2CredentialController.get(
delete oAuth2Parameters.clientSecret;
}
await ExternalHooks().run('oauth2.callback', [oAuth2Parameters]);
await Container.get(ExternalHooks).run('oauth2.callback', [oAuth2Parameters]);
const oAuthObj = new ClientOAuth2(oAuth2Parameters);

View file

@ -3,6 +3,7 @@ import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/mi
import { NodeTypes } from '@/NodeTypes';
import { IConnections, INode } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import { Container } from 'typedi';
export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface {
name = 'PurgeInvalidWorkflowConnections1675940580449';
@ -21,7 +22,7 @@ export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationIn
FROM \`${tablePrefix}workflow_entity\`
`);
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
workflows.forEach(async (workflow) => {
let connections: IConnections =

View file

@ -3,6 +3,7 @@ import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/mi
import { NodeTypes } from '@/NodeTypes';
import { IConnections, INode } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import { Container } from 'typedi';
export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface {
name = 'PurgeInvalidWorkflowConnections1675940580449';
@ -17,7 +18,7 @@ export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationIn
FROM "${tablePrefix}workflow_entity"
`);
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
workflows.forEach(async (workflow) => {
let connections: IConnections = workflow.connections;

View file

@ -3,6 +3,7 @@ import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/mi
import { NodeTypes } from '@/NodeTypes';
import { IConnections, INode } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import { Container } from 'typedi';
export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationInterface {
name = 'PurgeInvalidWorkflowConnections1675940580449';
@ -18,7 +19,7 @@ export class PurgeInvalidWorkflowConnections1675940580449 implements MigrationIn
FROM "${tablePrefix}workflow_entity"
`);
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
workflows.forEach(async (workflow) => {
let connections: IConnections = JSON.parse(workflow.connections);

View file

@ -4,11 +4,12 @@ import { NodeOperationError, WorkflowOperationError } from 'n8n-workflow';
import * as Db from '@/Db';
import type { EventMessageTypes, EventNamesTypes } from '../EventMessageClasses';
import type { DateTime } from 'luxon';
import { InternalHooksManager } from '../../InternalHooksManager';
import { getPushInstance } from '@/push';
import { Push } from '@/push';
import type { IPushDataExecutionRecovered } from '../../Interfaces';
import { workflowExecutionCompleted } from '../../events/WorkflowStatistics';
import { eventBus } from './MessageEventBus';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export async function recoverExecutionDataFromEventLogMessages(
executionId: string,
@ -151,16 +152,19 @@ export async function recoverExecutionDataFromEventLogMessages(
status: 'crashed',
stoppedAt: lastNodeRunTimestamp?.toJSDate(),
});
const internalHooks = InternalHooksManager.getInstance();
await internalHooks.onWorkflowPostExecute(executionId, executionEntry.workflowData, {
data: executionData,
finished: false,
mode: executionEntry.mode,
waitTill: executionEntry.waitTill ?? undefined,
startedAt: executionEntry.startedAt,
stoppedAt: lastNodeRunTimestamp?.toJSDate(),
status: 'crashed',
});
await Container.get(InternalHooks).onWorkflowPostExecute(
executionId,
executionEntry.workflowData,
{
data: executionData,
finished: false,
mode: executionEntry.mode,
waitTill: executionEntry.waitTill ?? undefined,
startedAt: executionEntry.startedAt,
stoppedAt: lastNodeRunTimestamp?.toJSDate(),
status: 'crashed',
},
);
const iRunData: IRun = {
data: executionData,
finished: false,
@ -178,7 +182,7 @@ export async function recoverExecutionDataFromEventLogMessages(
eventBus.once('editorUiConnected', function handleUiBackUp() {
// add a small timeout to make sure the UI is back up
setTimeout(() => {
getPushInstance().send('executionRecovered', {
Container.get(Push).send('executionRecovered', {
executionId,
} as IPushDataExecutionRecovered);
}, 1000);

View file

@ -1,9 +1,10 @@
import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import { StatisticsNames } from '@db/entities/WorkflowStatistics';
import { getWorkflowOwner } from '@/UserManagement/UserManagementHelper';
import { QueryFailedError } from 'typeorm';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export async function workflowExecutionCompleted(
workflowData: IWorkflowBase,
@ -46,7 +47,7 @@ export async function workflowExecutionCompleted(
};
// Send the metrics
await InternalHooksManager.getInstance().onFirstProductionWorkflowSuccess(metrics);
await Container.get(InternalHooks).onFirstProductionWorkflowSuccess(metrics);
} catch (error) {
if (!(error instanceof QueryFailedError)) {
throw error;
@ -101,5 +102,5 @@ export async function nodeFetchedData(
}
// Send metrics to posthog
await InternalHooksManager.getInstance().onFirstWorkflowDataLoad(metrics);
await Container.get(InternalHooks).onFirstWorkflowDataLoad(metrics);
}

View file

@ -15,7 +15,7 @@ import type {
import { deepCopy, LoggerProxy, jsonParse, Workflow } from 'n8n-workflow';
import type { FindOperator, FindOptionsWhere } from 'typeorm';
import { In, IsNull, LessThanOrEqual, Not, Raw } from 'typeorm';
import * as ActiveExecutions from '@/ActiveExecutions';
import { ActiveExecutions } from '@/ActiveExecutions';
import config from '@/config';
import type { User } from '@db/entities/User';
import type { ExecutionEntity } from '@db/entities/ExecutionEntity';
@ -34,6 +34,7 @@ import { WorkflowRunner } from '@/WorkflowRunner';
import * as Db from '@/Db';
import * as GenericHelpers from '@/GenericHelpers';
import { parse } from 'flatted';
import { Container } from 'typedi';
interface IGetExecutionsQueryFilter {
id?: FindOperator<string>;
@ -202,7 +203,7 @@ export class ExecutionsService {
// We may have manual executions even with queue so we must account for these.
executingWorkflowIds.push(
...ActiveExecutions.getInstance()
...Container.get(ActiveExecutions)
.getActiveExecutions()
.map(({ id }) => id),
);
@ -446,7 +447,7 @@ export class ExecutionsService {
}
data.workflowData = workflowData;
const nodeTypes = NodeTypes();
const nodeTypes = Container.get(NodeTypes);
const workflowInstance = new Workflow({
id: workflowData.id as string,
name: workflowData.name,
@ -481,7 +482,7 @@ export class ExecutionsService {
const workflowRunner = new WorkflowRunner();
const retriedExecutionId = await workflowRunner.run(data);
const executionData = await ActiveExecutions.getInstance().getPostExecutePromise(
const executionData = await Container.get(ActiveExecutions).getPostExecutePromise(
retriedExecutionId,
);

View file

@ -2,18 +2,13 @@
export * from './CredentialsHelper';
export * from './CredentialTypes';
export * from './CredentialsOverwrites';
export * from './ExternalHooks';
export * from './Interfaces';
export * from './InternalHooksManager';
export * from './LoadNodesAndCredentials';
export * from './NodeTypes';
export * from './WaitTracker';
export * from './WaitingWebhooks';
export * from './WorkflowCredentials';
export * from './WorkflowRunner';
import * as ActiveExecutions from './ActiveExecutions';
import * as ActiveWorkflowRunner from './ActiveWorkflowRunner';
import { ActiveExecutions } from './ActiveExecutions';
import * as Db from './Db';
import * as GenericHelpers from './GenericHelpers';
import * as ResponseHelper from './ResponseHelper';
@ -26,7 +21,6 @@ import * as WorkflowHelpers from './WorkflowHelpers';
export {
ActiveExecutions,
ActiveWorkflowRunner,
Db,
GenericHelpers,
ResponseHelper,

View file

@ -5,12 +5,13 @@ import { LoggerProxy } from 'n8n-workflow';
import { getLogger } from '@/Logger';
import * as ResponseHelper from '@/ResponseHelper';
import { InternalHooksManager } from '@/InternalHooksManager';
import type { ILicensePostResponse, ILicenseReadResponse } from '@/Interfaces';
import { LicenseService } from './License.service';
import { getLicense } from '@/License';
import type { AuthenticatedRequest, LicenseRequest } from '@/requests';
import { isInstanceOwner } from '@/PublicApi/v1/handlers/users/users.service';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export const licenseController = express.Router();
@ -115,14 +116,14 @@ licenseController.post(
await license.renew();
} catch (e) {
// not awaiting so as not to make the endpoint hang
void InternalHooksManager.getInstance().onLicenseRenewAttempt({ success: false });
void Container.get(InternalHooks).onLicenseRenewAttempt({ success: false });
if (e instanceof Error) {
throw new ResponseHelper.BadRequestError(e.message);
}
}
// not awaiting so as not to make the endpoint hang
void InternalHooksManager.getInstance().onLicenseRenewAttempt({ success: true });
void Container.get(InternalHooks).onLicenseRenewAttempt({ success: true });
// Return the read data, plus the management JWT
return {

View file

@ -1,8 +1,10 @@
import { Service } from 'typedi';
import type { PostHog } from 'posthog-node';
import type { FeatureFlags, ITelemetryTrackProperties } from 'n8n-workflow';
import config from '@/config';
import type { PublicUser } from '..';
import type { PublicUser } from '@/Interfaces';
@Service()
export class PostHogClient {
private postHog?: PostHog;

View file

@ -29,7 +29,7 @@ export abstract class AbstractPush<T> {
}
}
send<D>(type: IPushDataType, data: D, sessionId: string | undefined = undefined) {
send<D>(type: IPushDataType, data: D, sessionId: string | undefined) {
const { connections } = this;
if (sessionId !== undefined && connections[sessionId] === undefined) {
Logger.error(`The session "${sessionId}" is not registered.`, { sessionId });

View file

@ -4,21 +4,35 @@ import type { Socket } from 'net';
import type { Application, RequestHandler } from 'express';
import { Server as WSServer } from 'ws';
import { parse as parseUrl } from 'url';
import { Container, Service } from 'typedi';
import config from '@/config';
import { resolveJwt } from '@/auth/jwt';
import { AUTH_COOKIE_NAME } from '@/constants';
import { SSEPush } from './sse.push';
import { WebSocketPush } from './websocket.push';
import type { Push, PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
export type { Push } from './types';
import type { PushResponse, SSEPushRequest, WebSocketPushRequest } from './types';
import type { IPushDataType } from '@/Interfaces';
const useWebSockets = config.getEnv('push.backend') === 'websocket';
let pushInstance: Push;
export const getPushInstance = () => {
if (!pushInstance) pushInstance = useWebSockets ? new WebSocketPush() : new SSEPush();
return pushInstance;
};
@Service()
export class Push {
private backend = useWebSockets ? new WebSocketPush() : new SSEPush();
handleRequest(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) {
if (req.ws) {
(this.backend as WebSocketPush).add(req.query.sessionId, req.ws);
} else if (!useWebSockets) {
(this.backend as SSEPush).add(req.query.sessionId, { req, res });
} else {
res.status(401).send('Unauthorized');
}
}
send<D>(type: IPushDataType, data: D, sessionId: string | undefined = undefined) {
this.backend.send(type, data, sessionId);
}
}
export const setupPushServer = (restEndpoint: string, server: Server, app: Application) => {
if (useWebSockets) {
@ -48,7 +62,6 @@ export const setupPushHandler = (
app: Application,
isUserManagementEnabled: boolean,
) => {
const push = getPushInstance();
const endpoint = `/${restEndpoint}/push`;
const pushValidationMiddleware: RequestHandler = async (
@ -89,17 +102,10 @@ export const setupPushHandler = (
next();
};
const push = Container.get(Push);
app.use(
endpoint,
pushValidationMiddleware,
(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) => {
if (req.ws) {
(push as WebSocketPush).add(req.query.sessionId, req.ws);
} else if (!useWebSockets) {
(push as SSEPush).add(req.query.sessionId, { req, res });
} else {
res.status(401).send('Unauthorized');
}
},
(req: SSEPushRequest | WebSocketPushRequest, res: PushResponse) => push.handleRequest(req, res),
);
};

View file

@ -1,12 +1,8 @@
import type { Request, Response } from 'express';
import type { WebSocket } from 'ws';
import type { SSEPush } from './sse.push';
import type { WebSocketPush } from './websocket.push';
// TODO: move all push related types here
export type Push = SSEPush | WebSocketPush;
export type PushRequest = Request<{}, {}, {}, { sessionId: string }>;
export type SSEPushRequest = PushRequest & { ws: undefined };

View file

@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-unsafe-call */
/* eslint-disable @typescript-eslint/no-unsafe-member-access */
import type RudderStack from '@rudderstack/rudder-sdk-node';
import type { PostHogClient } from '../posthog';
import { PostHogClient } from '@/posthog';
import type { ITelemetryTrackProperties } from 'n8n-workflow';
import { LoggerProxy } from 'n8n-workflow';
import config from '@/config';
@ -10,6 +10,7 @@ import { getLogger } from '@/Logger';
import { getLicense } from '@/License';
import { LicenseService } from '@/license/License.service';
import { N8N_VERSION } from '@/constants';
import { Service } from 'typedi';
type ExecutionTrackDataKey = 'manual_error' | 'manual_success' | 'prod_error' | 'prod_success';
@ -28,14 +29,21 @@ interface IExecutionsBuffer {
};
}
@Service()
export class Telemetry {
private instanceId: string;
private rudderStack?: RudderStack;
private pulseIntervalReference: NodeJS.Timeout;
private executionCountsBuffer: IExecutionsBuffer = {};
constructor(private instanceId: string, private postHog: PostHogClient) {}
constructor(private postHog: PostHogClient) {}
setInstanceId(instanceId: string) {
this.instanceId = instanceId;
}
async init() {
const enabled = config.getEnv('diagnostics.enabled');

View file

@ -1,7 +1,6 @@
import express from 'express';
import { v4 as uuid } from 'uuid';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as ResponseHelper from '@/ResponseHelper';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import config from '@/config';
@ -18,6 +17,8 @@ import { EECredentialsService as EECredentials } from '../credentials/credential
import type { IExecutionPushResponse } from '@/Interfaces';
import * as GenericHelpers from '@/GenericHelpers';
import { In } from 'typeorm';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
// eslint-disable-next-line @typescript-eslint/naming-convention
export const EEWorkflowController = express.Router();
@ -75,7 +76,7 @@ EEWorkflowController.put(
}
});
void InternalHooksManager.getInstance().onWorkflowSharingUpdate(
void Container.get(InternalHooks).onWorkflowSharingUpdate(
workflowId,
req.user.id,
shareWithIds,
@ -126,7 +127,7 @@ EEWorkflowController.post(
await validateEntity(newWorkflow);
await ExternalHooks().run('workflow.create', [newWorkflow]);
await Container.get(ExternalHooks).run('workflow.create', [newWorkflow]);
const { tags: tagIds } = req.body;
@ -190,8 +191,8 @@ EEWorkflowController.post(
});
}
await ExternalHooks().run('workflow.afterCreate', [savedWorkflow]);
void InternalHooksManager.getInstance().onWorkflowCreated(req.user, newWorkflow, false);
await Container.get(ExternalHooks).run('workflow.afterCreate', [savedWorkflow]);
void Container.get(InternalHooks).onWorkflowCreated(req.user, newWorkflow, false);
return savedWorkflow;
}),

View file

@ -15,7 +15,6 @@ import * as TagHelpers from '@/TagHelpers';
import { SharedWorkflow } from '@db/entities/SharedWorkflow';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { validateEntity } from '@/GenericHelpers';
import { InternalHooksManager } from '@/InternalHooksManager';
import { ExternalHooks } from '@/ExternalHooks';
import { getLogger } from '@/Logger';
import type { WorkflowRequest } from '@/requests';
@ -24,6 +23,8 @@ import { EEWorkflowController } from './workflows.controller.ee';
import { WorkflowsService } from './workflows.services';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import { In } from 'typeorm';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
export const workflowsController = express.Router();
@ -57,7 +58,7 @@ workflowsController.post(
await validateEntity(newWorkflow);
await ExternalHooks().run('workflow.create', [newWorkflow]);
await Container.get(ExternalHooks).run('workflow.create', [newWorkflow]);
const { tags: tagIds } = req.body;
@ -106,8 +107,8 @@ workflowsController.post(
});
}
await ExternalHooks().run('workflow.afterCreate', [savedWorkflow]);
void InternalHooksManager.getInstance().onWorkflowCreated(req.user, newWorkflow, false);
await Container.get(ExternalHooks).run('workflow.afterCreate', [savedWorkflow]);
void Container.get(InternalHooks).onWorkflowCreated(req.user, newWorkflow, false);
return savedWorkflow;
}),

View file

@ -1,3 +1,4 @@
import { Container } from 'typedi';
import { validate as jsonSchemaValidate } from 'jsonschema';
import type { INode, IPinData, JsonObject } from 'n8n-workflow';
import { NodeApiError, jsonParse, LoggerProxy, Workflow } from 'n8n-workflow';
@ -5,9 +6,8 @@ import type { FindOptionsSelect, FindOptionsWhere, UpdateResult } from 'typeorm'
import { In } from 'typeorm';
import pick from 'lodash.pick';
import { v4 as uuid } from 'uuid';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
import { InternalHooksManager } from '@/InternalHooksManager';
import * as ResponseHelper from '@/ResponseHelper';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import config from '@/config';
@ -22,10 +22,11 @@ import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import * as TestWebhooks from '@/TestWebhooks';
import { TestWebhooks } from '@/TestWebhooks';
import { getSharedWorkflowIds } from '@/WorkflowHelpers';
import { isSharingEnabled, whereClause } from '@/UserManagement/UserManagementHelper';
import type { WorkflowForList } from '@/workflows/workflows.types';
import { InternalHooks } from '@/InternalHooks';
export type IGetWorkflowsQueryFilter = Pick<
FindOptionsWhere<WorkflowEntity>,
@ -91,7 +92,7 @@ export class WorkflowsService {
nodes: workflow.nodes,
connections: workflow.connections,
active: workflow.active,
nodeTypes: NodeTypes(),
nodeTypes: Container.get(NodeTypes),
}).getParentNodes(startNodeName);
let checkNodeName = '';
@ -236,12 +237,12 @@ export class WorkflowsService {
WorkflowHelpers.addNodeIds(workflow);
await ExternalHooks().run('workflow.update', [workflow]);
await Container.get(ExternalHooks).run('workflow.update', [workflow]);
if (shared.workflow.active) {
// When workflow gets saved always remove it as the triggers could have been
// changed and so the changes would not take effect
await ActiveWorkflowRunner.getInstance().remove(workflowId);
await Container.get(ActiveWorkflowRunner).remove(workflowId);
}
if (workflow.settings) {
@ -319,14 +320,14 @@ export class WorkflowsService {
});
}
await ExternalHooks().run('workflow.afterUpdate', [updatedWorkflow]);
void InternalHooksManager.getInstance().onWorkflowSaved(user, updatedWorkflow, false);
await Container.get(ExternalHooks).run('workflow.afterUpdate', [updatedWorkflow]);
void Container.get(InternalHooks).onWorkflowSaved(user, updatedWorkflow, false);
if (updatedWorkflow.active) {
// When the workflow is supposed to be active add it again
try {
await ExternalHooks().run('workflow.activate', [updatedWorkflow]);
await ActiveWorkflowRunner.getInstance().add(
await Container.get(ExternalHooks).run('workflow.activate', [updatedWorkflow]);
await Container.get(ActiveWorkflowRunner).add(
workflowId,
shared.workflow.active ? 'update' : 'activate',
);
@ -383,14 +384,14 @@ export class WorkflowsService {
nodes: workflowData.nodes,
connections: workflowData.connections,
active: false,
nodeTypes: NodeTypes(),
nodeTypes: Container.get(NodeTypes),
staticData: undefined,
settings: workflowData.settings,
});
const additionalData = await WorkflowExecuteAdditionalData.getBase(user.id);
const needsWebhook = await TestWebhooks.getInstance().needsWebhookData(
const needsWebhook = await Container.get(TestWebhooks).needsWebhookData(
workflowData,
workflow,
additionalData,
@ -436,7 +437,7 @@ export class WorkflowsService {
}
static async delete(user: User, workflowId: string): Promise<WorkflowEntity | undefined> {
await ExternalHooks().run('workflow.delete', [workflowId]);
await Container.get(ExternalHooks).run('workflow.delete', [workflowId]);
const sharedWorkflow = await Db.collections.SharedWorkflow.findOne({
relations: ['workflow', 'role'],
@ -454,13 +455,13 @@ export class WorkflowsService {
if (sharedWorkflow.workflow.active) {
// deactivate before deleting
await ActiveWorkflowRunner.getInstance().remove(workflowId);
await Container.get(ActiveWorkflowRunner).remove(workflowId);
}
await Db.collections.Workflow.delete(workflowId);
void InternalHooksManager.getInstance().onWorkflowDeleted(user, workflowId, false);
await ExternalHooks().run('workflow.afterDelete', [workflowId]);
void Container.get(InternalHooks).onWorkflowDeleted(user, workflowId, false);
await Container.get(ExternalHooks).run('workflow.afterDelete', [workflowId]);
return sharedWorkflow.workflow;
}

View file

@ -6,6 +6,13 @@ import { OFFICIAL_RISKY_NODE_TYPES, NODES_REPORT } from '@/audit/constants';
import { getRiskSection, MOCK_PACKAGE, saveManualTriggerWorkflow } from './utils';
import * as testDb from '../shared/testDb';
import { toReportTitle } from '@/audit/utils';
import { mockInstance } from '../shared/utils';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
const nodesAndCredentials = mockInstance(LoadNodesAndCredentials);
nodesAndCredentials.getCustomDirectories.mockReturnValue([]);
mockInstance(NodeTypes);
beforeAll(async () => {
await testDb.init();

View file

@ -2,10 +2,17 @@ import * as Db from '@/Db';
import { Reset } from '@/commands/user-management/reset';
import type { Role } from '@db/entities/Role';
import * as testDb from '../shared/testDb';
import { mockInstance } from '../shared/utils';
import { InternalHooks } from '@/InternalHooks';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { NodeTypes } from '@/NodeTypes';
let globalOwnerRole: Role;
beforeAll(async () => {
mockInstance(InternalHooks);
mockInstance(LoadNodesAndCredentials);
mockInstance(NodeTypes);
await testDb.init();
globalOwnerRole = await testDb.getGlobalOwnerRole();

View file

@ -20,6 +20,12 @@ import type { Role } from '@db/entities/Role';
import type { AuthAgent } from './shared/types';
import type { InstalledNodes } from '@db/entities/InstalledNodes';
import { COMMUNITY_PACKAGE_VERSION } from './shared/constants';
import { NodeTypes } from '@/NodeTypes';
import { Push } from '@/push';
const mockLoadNodesAndCredentials = utils.mockInstance(LoadNodesAndCredentials);
utils.mockInstance(NodeTypes);
utils.mockInstance(Push);
jest.mock('@/CommunityNodes/helpers', () => {
return {
@ -213,7 +219,7 @@ test('POST /nodes should allow installing packages that could not be loaded', as
mocked(hasPackageLoaded).mockReturnValueOnce(false);
mocked(checkNpmPackageStatus).mockResolvedValueOnce({ status: 'OK' });
jest.spyOn(LoadNodesAndCredentials(), 'loadNpmModule').mockImplementationOnce(mockedEmptyPackage);
mockLoadNodesAndCredentials.loadNpmModule.mockImplementationOnce(mockedEmptyPackage);
const { statusCode } = await authAgent(ownerShell).post('/nodes').send({
name: utils.installedPackagePayload().packageName,
@ -267,9 +273,7 @@ test('DELETE /nodes should reject if package is not installed', async () => {
test('DELETE /nodes should uninstall package', async () => {
const ownerShell = await testDb.createUserShell(globalOwnerRole);
const removeSpy = jest
.spyOn(LoadNodesAndCredentials(), 'removeNpmModule')
.mockImplementationOnce(jest.fn());
const removeSpy = mockLoadNodesAndCredentials.removeNpmModule.mockImplementationOnce(jest.fn());
mocked(findInstalledPackage).mockImplementationOnce(mockedEmptyPackage);
@ -310,9 +314,8 @@ test('PATCH /nodes reject if package is not installed', async () => {
test('PATCH /nodes should update a package', async () => {
const ownerShell = await testDb.createUserShell(globalOwnerRole);
const updateSpy = jest
.spyOn(LoadNodesAndCredentials(), 'updateNpmModule')
.mockImplementationOnce(mockedEmptyPackage);
const updateSpy =
mockLoadNodesAndCredentials.updateNpmModule.mockImplementationOnce(mockedEmptyPackage);
mocked(findInstalledPackage).mockImplementationOnce(mockedEmptyPackage);

View file

@ -1,3 +1,4 @@
import { Container } from 'typedi';
import { randomBytes } from 'crypto';
import { existsSync } from 'fs';
@ -25,14 +26,15 @@ import {
import superagent from 'superagent';
import request from 'supertest';
import { URL } from 'url';
import { mock } from 'jest-mock-extended';
import { DeepPartial } from 'ts-essentials';
import config from '@/config';
import * as Db from '@/Db';
import { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { CredentialTypes } from '@/CredentialTypes';
import { ExternalHooks } from '@/ExternalHooks';
import { InternalHooksManager } from '@/InternalHooksManager';
import { NodeTypes } from '@/NodeTypes';
import * as ActiveWorkflowRunner from '@/ActiveWorkflowRunner';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import { nodesController } from '@/api/nodes.api';
import { workflowsController } from '@/workflows/workflows.controller';
import { AUTH_COOKIE_NAME, NODE_PACKAGE_PREFIX } from '@/constants';
@ -74,16 +76,25 @@ import * as testDb from '../shared/testDb';
import { v4 as uuid } from 'uuid';
import { handleLdapInit } from '@/Ldap/helpers';
import { ldapController } from '@/Ldap/routes/ldap.controller.ee';
import { InternalHooks } from '@/InternalHooks';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { PostHogClient } from '@/posthog';
export const mockInstance = <T>(
ctor: new (...args: any[]) => T,
data: DeepPartial<T> | undefined = undefined,
) => {
const instance = mock<T>(data);
Container.set(ctor, instance);
return instance;
};
const loadNodesAndCredentials: INodesAndCredentials = {
loaded: { nodes: {}, credentials: {} },
known: { nodes: {}, credentials: {} },
credentialTypes: {} as ICredentialTypes,
};
const mockNodeTypes = NodeTypes(loadNodesAndCredentials);
CredentialTypes(loadNodesAndCredentials);
Container.set(LoadNodesAndCredentials, loadNodesAndCredentials);
/**
* Initialize a test server.
@ -108,11 +119,9 @@ export async function initTestServer({
const logger = getLogger();
LoggerProxy.init(logger);
const postHog = new PostHogClient();
postHog.init('test-instance-id');
// Pre-requisite: Mock the telemetry module before calling.
await InternalHooksManager.init('test-instance-id', mockNodeTypes, postHog);
// Mock all telemetry.
mockInstance(InternalHooks);
mockInstance(PostHogClient);
testServer.app.use(bodyParser.json());
testServer.app.use(bodyParser.urlencoded({ extended: true }));
@ -137,7 +146,7 @@ export async function initTestServer({
endpointGroups.includes('users') ||
endpointGroups.includes('passwordReset')
) {
testServer.externalHooks = ExternalHooks();
testServer.externalHooks = Container.get(ExternalHooks);
}
const [routerEndpoints, functionEndpoints] = classifyEndpointGroups(endpointGroups);
@ -167,8 +176,8 @@ export async function initTestServer({
}
if (functionEndpoints.length) {
const externalHooks = ExternalHooks();
const internalHooks = InternalHooksManager.getInstance();
const externalHooks = Container.get(ExternalHooks);
const internalHooks = Container.get(InternalHooks);
const mailer = UserManagementMailer.getInstance();
const repositories = Db.collections;
@ -218,7 +227,7 @@ export async function initTestServer({
externalHooks,
internalHooks,
repositories,
activeWorkflowRunner: ActiveWorkflowRunner.getInstance(),
activeWorkflowRunner: Container.get(ActiveWorkflowRunner),
logger,
}),
);
@ -261,8 +270,8 @@ const classifyEndpointGroups = (endpointGroups: EndpointGroup[]) => {
/**
* Initialize node types.
*/
export async function initActiveWorkflowRunner(): Promise<ActiveWorkflowRunner.ActiveWorkflowRunner> {
const workflowRunner = ActiveWorkflowRunner.getInstance();
export async function initActiveWorkflowRunner(): Promise<ActiveWorkflowRunner> {
const workflowRunner = Container.get(ActiveWorkflowRunner);
workflowRunner.init();
return workflowRunner;
}
@ -303,7 +312,7 @@ export function gitHubCredentialType(): ICredentialType {
* Initialize node types.
*/
export async function initCredentialsTypes(): Promise<void> {
loadNodesAndCredentials.loaded.credentials = {
Container.get(LoadNodesAndCredentials).loaded.credentials = {
githubApi: {
type: gitHubCredentialType(),
sourcePath: '',
@ -322,7 +331,7 @@ export async function initLdapManager(): Promise<void> {
* Initialize node types.
*/
export async function initNodeTypes() {
loadNodesAndCredentials.loaded.nodes = {
Container.get(LoadNodesAndCredentials).loaded.nodes = {
'n8n-nodes-base.start': {
sourcePath: '',
type: {

View file

@ -1,7 +1,13 @@
import { v4 as uuid } from 'uuid';
import { mocked } from 'jest-mock';
import { ICredentialTypes, LoggerProxy, NodeOperationError, Workflow } from 'n8n-workflow';
import {
ICredentialTypes,
INodesAndCredentials,
LoggerProxy,
NodeOperationError,
Workflow,
} from 'n8n-workflow';
import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner';
import * as Db from '@/Db';
@ -10,12 +16,17 @@ import { SharedWorkflow } from '@/databases/entities/SharedWorkflow';
import { Role } from '@/databases/entities/Role';
import { User } from '@/databases/entities/User';
import { getLogger } from '@/Logger';
import { NodeTypes } from '@/NodeTypes';
import { CredentialTypes } from '@/CredentialTypes';
import { randomEmail, randomName } from '../integration/shared/random';
import * as Helpers from './Helpers';
import { WorkflowExecuteAdditionalData } from '@/index';
import { WorkflowRunner } from '@/WorkflowRunner';
import { mock } from 'jest-mock-extended';
import { ExternalHooks } from '@/ExternalHooks';
import { Container } from 'typedi';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import { mockInstance } from '../integration/shared/utils';
import { Push } from '@/push';
/**
* TODO:
@ -112,19 +123,6 @@ jest.mock('@/Db', () => {
};
});
const mockExternalHooksRunFunction = jest.fn();
jest.mock('@/ExternalHooks', () => {
return {
ExternalHooks: () => {
return {
run: () => mockExternalHooksRunFunction(),
init: () => Promise.resolve(),
};
},
};
});
const workflowCheckIfCanBeActivated = jest.fn(() => true);
jest
@ -140,30 +138,26 @@ const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn(
);
describe('ActiveWorkflowRunner', () => {
let externalHooks: ExternalHooks;
let activeWorkflowRunner: ActiveWorkflowRunner;
beforeAll(async () => {
LoggerProxy.init(getLogger());
NodeTypes({
const nodesAndCredentials: INodesAndCredentials = {
loaded: {
nodes: MOCK_NODE_TYPES_DATA,
credentials: {},
},
known: { nodes: {}, credentials: {} },
credentialTypes: {} as ICredentialTypes,
});
CredentialTypes({
loaded: {
nodes: MOCK_NODE_TYPES_DATA,
credentials: {},
},
known: { nodes: {}, credentials: {} },
credentialTypes: {} as ICredentialTypes,
});
};
Container.set(LoadNodesAndCredentials, nodesAndCredentials);
mockInstance(Push);
});
beforeEach(() => {
activeWorkflowRunner = new ActiveWorkflowRunner();
externalHooks = mock();
activeWorkflowRunner = new ActiveWorkflowRunner(externalHooks);
});
afterEach(async () => {
@ -173,84 +167,84 @@ describe('ActiveWorkflowRunner', () => {
});
test('Should initialize activeWorkflowRunner with empty list of active workflows and call External Hooks', async () => {
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(mocked(Db.collections.Webhook.clear)).toHaveBeenCalled();
expect(mockExternalHooksRunFunction).toHaveBeenCalledTimes(1);
expect(externalHooks.run).toHaveBeenCalledTimes(1);
});
test('Should initialize activeWorkflowRunner with one active workflow', async () => {
databaseActiveWorkflowsCount = 1;
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled();
expect(mocked(Db.collections.Webhook.clear)).toHaveBeenCalled();
expect(mockExternalHooksRunFunction).toHaveBeenCalled();
expect(externalHooks.run).toHaveBeenCalled();
});
test('Should make sure function checkIfWorkflowCanBeActivated was called for every workflow', async () => {
databaseActiveWorkflowsCount = 2;
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
});
test('Call to removeAll should remove every workflow', async () => {
databaseActiveWorkflowsCount = 2;
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
void (await activeWorkflowRunner.removeAll());
await activeWorkflowRunner.removeAll();
expect(removeFunction).toHaveBeenCalledTimes(databaseActiveWorkflowsCount);
});
test('Call to remove should also call removeWorkflowWebhooks', async () => {
databaseActiveWorkflowsCount = 1;
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(
databaseActiveWorkflowsCount,
);
void (await activeWorkflowRunner.remove('1'));
await activeWorkflowRunner.remove('1');
expect(removeWebhooksFunction).toHaveBeenCalledTimes(1);
});
test('Call to isActive should return true for valid workflow', async () => {
databaseActiveWorkflowsCount = 1;
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.isActive('1')).toBe(true);
});
test('Call to isActive should return false for invalid workflow', async () => {
databaseActiveWorkflowsCount = 1;
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
expect(await activeWorkflowRunner.isActive('2')).toBe(false);
});
test('Calling add should call checkIfWorkflowCanBeActivated', async () => {
// Initialize with default (0) workflows
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
generateWorkflows(1);
void (await activeWorkflowRunner.add('1', 'activate'));
await activeWorkflowRunner.add('1', 'activate');
expect(workflowCheckIfCanBeActivated).toHaveBeenCalledTimes(1);
});
test('runWorkflow should call run method in WorkflowRunner', async () => {
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
const workflow = generateWorkflows(1);
const additionalData = await WorkflowExecuteAdditionalData.getBase('fake-user-id');
workflowRunnerRun.mockImplementationOnce(() => Promise.resolve('invalid-execution-id'));
void (await activeWorkflowRunner.runWorkflow(
await activeWorkflowRunner.runWorkflow(
workflow[0],
workflow[0].nodes[0],
[[]],
additionalData,
'trigger',
));
);
expect(workflowRunnerRun).toHaveBeenCalledTimes(1);
});
@ -258,7 +252,7 @@ describe('ActiveWorkflowRunner', () => {
test('executeErrorWorkflow should call function with same name in WorkflowExecuteAdditionalData', async () => {
const workflowData = generateWorkflows(1)[0];
const error = new NodeOperationError(workflowData.nodes[0], 'Fake error message');
void (await activeWorkflowRunner.init());
await activeWorkflowRunner.init();
activeWorkflowRunner.executeErrorWorkflow(error, workflowData, 'trigger');
expect(workflowExecuteAdditionalDataExecuteErrorWorkflowSpy).toHaveBeenCalledTimes(1);
});

View file

@ -1,47 +1,47 @@
import type { ICredentialTypes, INodesAndCredentials } from 'n8n-workflow';
import { CredentialTypes } from '@/CredentialTypes';
import { Container } from 'typedi';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
describe('ActiveExecutions', () => {
let credentialTypes: ICredentialTypes;
describe('CredentialTypes', () => {
const mockNodesAndCredentials: INodesAndCredentials = {
loaded: {
nodes: {},
credentials: {
fakeFirstCredential: {
type: {
name: 'fakeFirstCredential',
displayName: 'Fake First Credential',
properties: [],
},
sourcePath: '',
},
fakeSecondCredential: {
type: {
name: 'fakeSecondCredential',
displayName: 'Fake Second Credential',
properties: [],
},
sourcePath: '',
},
},
},
known: { nodes: {}, credentials: {} },
credentialTypes: {} as ICredentialTypes,
};
beforeEach(() => {
credentialTypes = CredentialTypes(mockNodesAndCredentials());
});
Container.set(LoadNodesAndCredentials, mockNodesAndCredentials);
const credentialTypes = Container.get(CredentialTypes);
test('Should throw error when calling invalid credential name', () => {
expect(() => credentialTypes.getByName('fakeThirdCredential')).toThrowError();
});
test('Should return correct credential type for valid name', () => {
const mockedCredentialTypes = mockNodesAndCredentials().loaded.credentials;
const mockedCredentialTypes = mockNodesAndCredentials.loaded.credentials;
expect(credentialTypes.getByName('fakeFirstCredential')).toStrictEqual(
mockedCredentialTypes.fakeFirstCredential.type,
);
});
});
const mockNodesAndCredentials = (): INodesAndCredentials => ({
loaded: {
nodes: {},
credentials: {
fakeFirstCredential: {
type: {
name: 'fakeFirstCredential',
displayName: 'Fake First Credential',
properties: [],
},
sourcePath: '',
},
fakeSecondCredential: {
type: {
name: 'fakeSecondCredential',
displayName: 'Fake Second Credential',
properties: [],
},
sourcePath: '',
},
},
},
known: { nodes: {}, credentials: {} },
credentialTypes: {} as ICredentialTypes,
});

View file

@ -12,6 +12,9 @@ import {
import { CredentialsHelper } from '@/CredentialsHelper';
import { CredentialTypes } from '@/CredentialTypes';
import * as Helpers from './Helpers';
import { Container } from 'typedi';
import { NodeTypes } from '@/NodeTypes';
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
const TEST_ENCRYPTION_KEY = 'test';
const mockNodesAndCredentials: INodesAndCredentials = {
@ -19,6 +22,7 @@ const mockNodesAndCredentials: INodesAndCredentials = {
known: { nodes: {}, credentials: {} },
credentialTypes: {} as ICredentialTypes,
};
Container.set(LoadNodesAndCredentials, mockNodesAndCredentials);
describe('CredentialsHelper', () => {
describe('authenticate', () => {
@ -215,7 +219,7 @@ describe('CredentialsHelper', () => {
qs: {},
};
const nodeTypes = Helpers.NodeTypes();
const nodeTypes = Helpers.NodeTypes() as unknown as NodeTypes;
const workflow = new Workflow({
nodes: [node],
@ -235,7 +239,7 @@ describe('CredentialsHelper', () => {
},
};
const credentialTypes = CredentialTypes(mockNodesAndCredentials);
const credentialTypes = Container.get(CredentialTypes);
const credentialsHelper = new CredentialsHelper(
TEST_ENCRYPTION_KEY,

View file

@ -3,23 +3,13 @@ import { QueryFailedError } from 'typeorm';
import config from '@/config';
import { Db } from '@/index';
import { nodeFetchedData, workflowExecutionCompleted } from '@/events/WorkflowStatistics';
import { InternalHooksManager } from '@/InternalHooksManager';
import { getLogger } from '@/Logger';
import * as UserManagementHelper from '@/UserManagement/UserManagementHelper';
import * as UserManagementHelper from '@/UserManagement/UserManagementHelper';
import { InternalHooks } from '@/InternalHooks';
import { mockInstance } from '../integration/shared/utils';
const FAKE_USER_ID = 'abcde-fghij';
const mockedFirstProductionWorkflowSuccess = jest.fn((...args) => {});
const mockedFirstWorkflowDataLoad = jest.fn((...args) => {});
jest.spyOn(InternalHooksManager, 'getInstance').mockImplementation((...args) => {
const actual = jest.requireActual('@/InternalHooks');
return {
...actual,
onFirstProductionWorkflowSuccess: mockedFirstProductionWorkflowSuccess,
onFirstWorkflowDataLoad: mockedFirstWorkflowDataLoad,
};
});
jest.mock('@/Db', () => {
return {
collections: {
@ -30,11 +20,13 @@ jest.mock('@/Db', () => {
},
};
});
jest.spyOn(UserManagementHelper, 'getWorkflowOwner').mockImplementation(async (workflowId) => {
jest.spyOn(UserManagementHelper, 'getWorkflowOwner').mockImplementation(async (_workflowId) => {
return { id: FAKE_USER_ID };
});
describe('Events', () => {
const internalHooks = mockInstance(InternalHooks);
beforeAll(() => {
config.set('diagnostics.enabled', true);
config.set('deployment.type', 'n8n-testing');
@ -47,8 +39,8 @@ describe('Events', () => {
});
beforeEach(() => {
mockedFirstProductionWorkflowSuccess.mockClear();
mockedFirstWorkflowDataLoad.mockClear();
internalHooks.onFirstProductionWorkflowSuccess.mockClear();
internalHooks.onFirstWorkflowDataLoad.mockClear();
});
afterEach(() => {});
@ -72,8 +64,8 @@ describe('Events', () => {
startedAt: new Date(),
};
await workflowExecutionCompleted(workflow, runData);
expect(mockedFirstProductionWorkflowSuccess).toBeCalledTimes(1);
expect(mockedFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, {
expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(1);
expect(internalHooks.onFirstProductionWorkflowSuccess).toHaveBeenNthCalledWith(1, {
user_id: FAKE_USER_ID,
workflow_id: workflow.id,
});
@ -97,12 +89,12 @@ describe('Events', () => {
startedAt: new Date(),
};
await workflowExecutionCompleted(workflow, runData);
expect(mockedFirstProductionWorkflowSuccess).toBeCalledTimes(0);
expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(0);
});
test('should not send metrics for updated entries', async () => {
// Call the function with a fail insert, ensure update is called *and* metrics aren't sent
Db.collections.WorkflowStatistics.insert.mockImplementationOnce((...args) => {
Db.collections.WorkflowStatistics.insert.mockImplementationOnce(() => {
throw new QueryFailedError('invalid insert', [], '');
});
const workflow = {
@ -121,7 +113,7 @@ describe('Events', () => {
startedAt: new Date(),
};
await workflowExecutionCompleted(workflow, runData);
expect(mockedFirstProductionWorkflowSuccess).toBeCalledTimes(0);
expect(internalHooks.onFirstProductionWorkflowSuccess).toBeCalledTimes(0);
});
});
@ -138,8 +130,8 @@ describe('Events', () => {
parameters: {},
};
await nodeFetchedData(workflowId, node);
expect(mockedFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(mockedFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(internalHooks.onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: FAKE_USER_ID,
workflow_id: workflowId,
node_type: node.type,
@ -165,8 +157,8 @@ describe('Events', () => {
},
};
await nodeFetchedData(workflowId, node);
expect(mockedFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(mockedFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(1);
expect(internalHooks.onFirstWorkflowDataLoad).toHaveBeenNthCalledWith(1, {
user_id: FAKE_USER_ID,
workflow_id: workflowId,
node_type: node.type,
@ -178,7 +170,7 @@ describe('Events', () => {
test('should not send metrics for entries that already have the flag set', async () => {
// Fetch data for workflow 2 which is set up to not be altered in the mocks
Db.collections.WorkflowStatistics.insert.mockImplementationOnce((...args) => {
Db.collections.WorkflowStatistics.insert.mockImplementationOnce(() => {
throw new QueryFailedError('invalid insert', [], '');
});
const workflowId = '1';
@ -191,7 +183,7 @@ describe('Events', () => {
parameters: {},
};
await nodeFetchedData(workflowId, node);
expect(mockedFirstWorkflowDataLoad).toBeCalledTimes(0);
expect(internalHooks.onFirstWorkflowDataLoad).toBeCalledTimes(0);
});
});
});

View file

@ -1,14 +1,13 @@
import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials';
import {
INodesAndCredentials,
INodeType,
INodeTypeData,
INodeTypes,
ITriggerFunctions,
ITriggerResponse,
IVersionedNodeType,
NodeHelpers,
} from 'n8n-workflow';
// TODO: delete this
class NodeTypesClass implements INodeTypes {
nodeTypes: INodeTypeData = {
'test.set': {
@ -80,7 +79,7 @@ class NodeTypesClass implements INodeTypes {
},
};
constructor(nodesAndCredentials?: INodesAndCredentials) {
constructor(nodesAndCredentials?: LoadNodesAndCredentials) {
if (nodesAndCredentials?.loaded?.nodes) {
this.nodeTypes = nodesAndCredentials?.loaded?.nodes;
}
@ -97,7 +96,7 @@ class NodeTypesClass implements INodeTypes {
let nodeTypesInstance: NodeTypesClass | undefined;
export function NodeTypes(nodesAndCredentials?: INodesAndCredentials): NodeTypesClass {
export function NodeTypes(nodesAndCredentials?: LoadNodesAndCredentials): NodeTypesClass {
if (nodeTypesInstance === undefined) {
nodeTypesInstance = new NodeTypesClass(nodesAndCredentials);
}

View file

@ -45,7 +45,8 @@ describe('Telemetry', () => {
const postHog = new PostHogClient();
postHog.init(instanceId);
telemetry = new Telemetry(instanceId, postHog);
telemetry = new Telemetry(postHog);
telemetry.setInstanceId(instanceId);
(telemetry as any).rudderStack = {
flush: () => {},
identify: () => {},

View file

@ -3,7 +3,6 @@ import * as NodeExecuteFunctions from './NodeExecuteFunctions';
import * as UserSettings from './UserSettings';
export * from './ActiveWorkflows';
export * from './ActiveWebhooks';
export * from './BinaryDataManager';
export * from './ClassLoader';
export * from './Constants';

View file

@ -0,0 +1,12 @@
diff --git a/cjs/container-instance.class.js b/cjs/container-instance.class.js
index e473b1e652aa0b6e7462f7ba93fcef2812483b20..1e2ac7e5cb7943f5226a2bc25fa83bee0470f90c 100644
--- a/cjs/container-instance.class.js
+++ b/cjs/container-instance.class.js
@@ -234,6 +234,7 @@ class ContainerInstance {
*/
initializeParams(target, paramTypes) {
return paramTypes.map((paramType, index) => {
+ if (paramType === undefined) throw new ReferenceError('Cannot inject an `undefined` dependency. Possibly a circular dependency detected');
const paramHandler = container_class_1.Container.handlers.find(handler => {
/**
* @Inject()-ed values are stored as parameter handlers and they reference their target

View file

@ -21,6 +21,9 @@ patchedDependencies:
element-ui@2.15.12:
hash: prckukfdop5sl2her6de25cod4
path: patches/element-ui@2.15.12.patch
typedi@0.10.0:
hash: syy565ld7euwcedfbmx53j2qc4
path: patches/typedi@0.10.0.patch
importers:
@ -227,6 +230,7 @@ importers:
posthog-node: ^2.2.2
prom-client: ^13.1.0
psl: ^1.8.0
reflect-metadata: ^0.1.13
replacestream: ^4.0.3
run-script-os: ^1.0.7
semver: ^7.3.8
@ -236,8 +240,10 @@ importers:
sse-channel: ^4.0.0
swagger-ui-express: ^4.3.0
syslog-client: ^1.1.1
ts-essentials: ^7.0.3
tsc-alias: ^1.8.2
tsconfig-paths: ^4.1.2
typedi: ^0.10.0
typeorm: ^0.3.12
uuid: ^8.3.2
validator: 13.7.0
@ -320,6 +326,7 @@ importers:
posthog-node: 2.2.2
prom-client: 13.2.0
psl: 1.9.0
reflect-metadata: 0.1.13
replacestream: 4.0.3
semver: 7.3.8
shelljs: 0.8.5
@ -328,6 +335,7 @@ importers:
sse-channel: 4.0.0
swagger-ui-express: 4.5.0_express@4.18.2
syslog-client: 1.1.1
typedi: 0.10.0
typeorm: 0.3.12_pgelcv6ef3switkrteavpif3pq
uuid: 8.3.2
validator: 13.7.0
@ -381,6 +389,7 @@ importers:
mock-jwks: 1.0.9_nock@13.2.9
nodemon: 2.0.20
run-script-os: 1.1.6
ts-essentials: 7.0.3_typescript@4.9.4
tsc-alias: 1.8.2
tsconfig-paths: 4.1.2
@ -19491,6 +19500,10 @@ packages:
/typedarray/0.0.6:
resolution: {integrity: sha512-/aCDEGatGvZ2BIk+HmLf4ifCJFwvKFNb9/JeZPMulfgFracn9QFcAf5GO8B/mweUjSoblS5In0cWhqpfs/5PQA==}
/typedi/0.10.0:
resolution: {integrity: sha512-v3UJF8xm68BBj6AF4oQML3ikrfK2c9EmZUyLOfShpJuItAqVBHWP/KtpGinkSsIiP6EZyyb6Z3NXyW9dgS9X1w==}
dev: false
/typeorm/0.3.12_pgelcv6ef3switkrteavpif3pq:
resolution: {integrity: sha512-sYSxBmCf1nJLLTcYtwqZ+lQIRtLPyUoO93rHTOKk9vJCyT4UfRtU7oRsJvfvKP3nnZTD1hzz2SEy2zwPEN6OyA==}
engines: {node: '>= 12.9.0'}