From bcfc5e717bd6ccae85ac9642b04ca5ac9ad54080 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Ovejero?= Date: Tue, 25 Jul 2023 18:17:34 +0200 Subject: [PATCH] refactor(core): Move webhook DB access to repository (no-changelog) (#6706) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * refactor(core): Move webhook DB access to repository (no-changelog) * make sure `DataSource` is initialized before it's dependencies at some point I hope to replace `DataSource` with a custom `DatabaseConnection` service class that can then disconnect and reconnect from DB without having to update all repositories. --------- Co-authored-by: कारतोफ्फेलस्क्रिप्ट™ --- packages/cli/src/AbstractServer.ts | 6 +++--- packages/cli/src/ActiveWorkflowRunner.ts | 14 ++++++++------ packages/cli/src/WorkflowRunnerProcess.ts | 6 +++--- .../cli/test/unit/ActiveWorkflowRunner.test.ts | 7 +++++-- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/packages/cli/src/AbstractServer.ts b/packages/cli/src/AbstractServer.ts index 02f02becaf..b9dabaa951 100644 --- a/packages/cli/src/AbstractServer.ts +++ b/packages/cli/src/AbstractServer.ts @@ -74,9 +74,6 @@ export abstract class AbstractServer { this.endpointWebhook = config.getEnv('endpoints.webhook'); this.endpointWebhookTest = config.getEnv('endpoints.webhookTest'); this.endpointWebhookWaiting = config.getEnv('endpoints.webhookWaiting'); - - this.externalHooks = Container.get(ExternalHooks); - this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); } private async setupErrorHandlers() { @@ -438,6 +435,9 @@ export abstract class AbstractServer { await new Promise((resolve) => this.server.listen(PORT, ADDRESS, () => resolve())); + this.externalHooks = Container.get(ExternalHooks); + this.activeWorkflowRunner = Container.get(ActiveWorkflowRunner); + await this.setupHealthCheck(); console.log(`n8n ready on ${ADDRESS}, port ${PORT}`); diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index 66f90b3262..051ffcd5a7 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -67,6 +67,7 @@ import { WorkflowsService } from './workflows/workflows.services'; import { STARTING_NODES } from './constants'; import { webhookNotFoundErrorMessage } from './utils'; import { In } from 'typeorm'; +import { WebhookRepository } from '@db/repositories'; const WEBHOOK_PROD_UNREGISTERED_HINT = "The workflow must be active for a production URL to run successfully. You can activate the workflow using the toggle in the top-right of the editor. Note that unlike test URL calls, production URL calls aren't shown on the canvas (only in the executions list)"; @@ -87,6 +88,7 @@ export class ActiveWorkflowRunner { private activeExecutions: ActiveExecutions, private externalHooks: ExternalHooks, private nodeTypes: NodeTypes, + private webhookRepository: WebhookRepository, ) {} // eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types @@ -110,7 +112,7 @@ export class ActiveWorkflowRunner { // This is not officially supported but there is no reason // it should not work. // Clear up active workflow table - await Db.collections.Webhook.clear(); + await this.webhookRepository.clear(); } if (workflowsData.length !== 0) { @@ -201,7 +203,7 @@ export class ActiveWorkflowRunner { path = path.slice(0, -1); } - let webhook = await Db.collections.Webhook.findOneBy({ + let webhook = await this.webhookRepository.findOneBy({ webhookPath: path, method: httpMethod, }); @@ -212,7 +214,7 @@ export class ActiveWorkflowRunner { // check if a dynamic webhook path exists const pathElements = path.split('/'); webhookId = pathElements.shift(); - const dynamicWebhooks = await Db.collections.Webhook.findBy({ + const dynamicWebhooks = await this.webhookRepository.findBy({ webhookId, method: httpMethod, pathLength: pathElements.length, @@ -333,7 +335,7 @@ export class ActiveWorkflowRunner { * Gets all request methods associated with a single webhook */ async getWebhookMethods(path: string): Promise { - const webhooks = await Db.collections.Webhook.find({ + const webhooks = await this.webhookRepository.find({ select: ['method'], where: { webhookPath: path }, }); @@ -442,7 +444,7 @@ export class ActiveWorkflowRunner { try { // eslint-disable-next-line no-await-in-loop // TODO: this should happen in a transaction, that way we don't need to manually remove this in `catch` - await Db.collections.Webhook.insert(webhook); + await this.webhookRepository.insert(webhook); const webhookExists = await workflow.runWebhookMethod( 'checkExists', webhookData, @@ -552,7 +554,7 @@ export class ActiveWorkflowRunner { await WorkflowHelpers.saveStaticData(workflow); - await Db.collections.Webhook.delete({ + await this.webhookRepository.delete({ workflowId: workflowData.id, }); } diff --git a/packages/cli/src/WorkflowRunnerProcess.ts b/packages/cli/src/WorkflowRunnerProcess.ts index 6beba9bcd9..24f0a2c3a2 100644 --- a/packages/cli/src/WorkflowRunnerProcess.ts +++ b/packages/cli/src/WorkflowRunnerProcess.ts @@ -104,6 +104,9 @@ class WorkflowRunnerProcess { this.startedAt = new Date(); + // Init db since we need to read the license. + await Db.init(); + const userSettings = await UserSettings.prepareUserSettings(); const loadNodesAndCredentials = Container.get(LoadNodesAndCredentials); @@ -117,9 +120,6 @@ class WorkflowRunnerProcess { const externalHooks = Container.get(ExternalHooks); await externalHooks.init(); - // Init db since we need to read the license. - await Db.init(); - const instanceId = userSettings.instanceId ?? ''; await Container.get(PostHogClient).init(instanceId); await Container.get(InternalHooks).init(instanceId); diff --git a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts b/packages/cli/test/unit/ActiveWorkflowRunner.test.ts index 39b2222148..db1cc34209 100644 --- a/packages/cli/test/unit/ActiveWorkflowRunner.test.ts +++ b/packages/cli/test/unit/ActiveWorkflowRunner.test.ts @@ -24,6 +24,7 @@ import { mockInstance } from '../integration/shared/utils/'; import { Push } from '@/push'; import { ActiveExecutions } from '@/ActiveExecutions'; import { NodeTypes } from '@/NodeTypes'; +import type { WebhookRepository } from '@/databases/repositories'; /** * TODO: @@ -139,6 +140,7 @@ const workflowExecuteAdditionalDataExecuteErrorWorkflowSpy = jest.spyOn( describe('ActiveWorkflowRunner', () => { let externalHooks: ExternalHooks; let activeWorkflowRunner: ActiveWorkflowRunner; + let webhookRepository = mock(); beforeAll(async () => { LoggerProxy.init(getLogger()); @@ -160,6 +162,7 @@ describe('ActiveWorkflowRunner', () => { new ActiveExecutions(), externalHooks, Container.get(NodeTypes), + webhookRepository, ); }); @@ -174,7 +177,7 @@ describe('ActiveWorkflowRunner', () => { await activeWorkflowRunner.init(); expect(await activeWorkflowRunner.getActiveWorkflows()).toHaveLength(0); expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled(); - expect(mocked(Db.collections.Webhook.clear)).toHaveBeenCalled(); + expect(mocked(webhookRepository.clear)).toHaveBeenCalled(); expect(externalHooks.run).toHaveBeenCalledTimes(1); }); @@ -185,7 +188,7 @@ describe('ActiveWorkflowRunner', () => { databaseActiveWorkflowsCount, ); expect(mocked(Db.collections.Workflow.find)).toHaveBeenCalled(); - expect(mocked(Db.collections.Webhook.clear)).toHaveBeenCalled(); + expect(mocked(webhookRepository.clear)).toHaveBeenCalled(); expect(externalHooks.run).toHaveBeenCalled(); });