diff --git a/packages/cli/src/ActiveWorkflowRunner.ts b/packages/cli/src/ActiveWorkflowRunner.ts index d08e68f698..5a8927e6eb 100644 --- a/packages/cli/src/ActiveWorkflowRunner.ts +++ b/packages/cli/src/ActiveWorkflowRunner.ts @@ -1,6 +1,5 @@ -/* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ -/* eslint-disable @typescript-eslint/no-unsafe-assignment */ + import { Service } from 'typedi'; import { ActiveWorkflows, NodeExecuteFunctions } from 'n8n-core'; @@ -211,6 +210,7 @@ export class ActiveWorkflowRunner { error = new WebhookPathTakenError(webhook.node, error); } else if (error.detail) { // it's a error running the webhook methods (checkExists, create) + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment error.message = error.detail; } @@ -400,7 +400,7 @@ export class ActiveWorkflowRunner { error: ExecutionError, workflowData: IWorkflowBase, mode: WorkflowExecuteMode, - ): void { + ) { const fullRunData: IRun = { data: { resultData: { @@ -465,6 +465,7 @@ export class ActiveWorkflowRunner { this.executeErrorWorkflow(error, dbWorkflow, 'internal'); // do not keep trying to activate on authorization error + // eslint-disable-next-line @typescript-eslint/no-unsafe-call if (error.message.includes('Authorization')) continue; this.addQueuedWorkflowActivation('init', dbWorkflow); @@ -629,7 +630,10 @@ export class ActiveWorkflowRunner { * Meaning it will keep on trying to activate it in regular * amounts indefinitely. */ - addQueuedWorkflowActivation(activationMode: WorkflowActivateMode, workflowData: WorkflowEntity) { + private addQueuedWorkflowActivation( + activationMode: WorkflowActivateMode, + workflowData: WorkflowEntity, + ) { const workflowId = workflowData.id; const workflowName = workflowData.name; @@ -685,7 +689,7 @@ export class ActiveWorkflowRunner { /** * Remove a workflow from the activation queue */ - removeQueuedWorkflowActivation(workflowId: string) { + private removeQueuedWorkflowActivation(workflowId: string) { if (this.queuedActivations[workflowId]) { clearTimeout(this.queuedActivations[workflowId].timeout); delete this.queuedActivations[workflowId]; diff --git a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts b/packages/cli/test/integration/ActiveWorkflowRunner.test.ts deleted file mode 100644 index 571d2dedbd..0000000000 --- a/packages/cli/test/integration/ActiveWorkflowRunner.test.ts +++ /dev/null @@ -1,236 +0,0 @@ -import { Container } from 'typedi'; -import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow'; -import type { IWebhookData } from 'n8n-workflow'; - -import { ActiveExecutions } from '@/ActiveExecutions'; -import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; -import config from '@/config'; -import { ExternalHooks } from '@/ExternalHooks'; -import { Push } from '@/push'; -import { SecretsHelper } from '@/SecretsHelpers'; -import { WebhookService } from '@/services/webhook.service'; -import * as WebhookHelpers from '@/WebhookHelpers'; -import * as AdditionalData from '@/WorkflowExecuteAdditionalData'; -import type { User } from '@db/entities/User'; -import type { WebhookEntity } from '@db/entities/WebhookEntity'; -import { NodeTypes } from '@/NodeTypes'; -import { ExecutionService } from '@/executions/execution.service'; -import { WorkflowService } from '@/workflows/workflow.service'; -import { ActiveWorkflowsService } from '@/services/activeWorkflows.service'; - -import { mockInstance } from '../shared/mocking'; -import { setSchedulerAsLoadedNode } from './shared/utils'; -import * as testDb from './shared/testDb'; -import { createOwner } from './shared/db/users'; -import { createWorkflow } from './shared/db/workflows'; - -mockInstance(ActiveExecutions); -mockInstance(Push); -mockInstance(SecretsHelper); -mockInstance(ExecutionService); -mockInstance(WorkflowService); - -const webhookService = mockInstance(WebhookService); - -setSchedulerAsLoadedNode(); - -const externalHooks = mockInstance(ExternalHooks); - -let activeWorkflowsService: ActiveWorkflowsService; -let activeWorkflowRunner: ActiveWorkflowRunner; -let owner: User; - -beforeAll(async () => { - await testDb.init(); - - activeWorkflowsService = Container.get(ActiveWorkflowsService); - activeWorkflowRunner = Container.get(ActiveWorkflowRunner); - owner = await createOwner(); -}); - -afterEach(async () => { - await activeWorkflowRunner.removeAll(); - activeWorkflowRunner.removeAllQueuedWorkflowActivations(); - await testDb.truncate(['Workflow']); - config.load(config.default); - jest.restoreAllMocks(); -}); - -afterAll(async () => { - await testDb.terminate(); -}); - -describe('init()', () => { - test('should call `ExternalHooks.run()`', async () => { - const runSpy = jest.spyOn(externalHooks, 'run'); - - await activeWorkflowRunner.init(); - - expect(runSpy).toHaveBeenCalledTimes(1); - const [hook, arg] = runSpy.mock.calls[0]; - expect(hook).toBe('activeWorkflows.initialized'); - expect(arg).toBeEmptyArray(); - }); - - test('should start with no active workflows', async () => { - await activeWorkflowRunner.init(); - - const inStorage = await activeWorkflowsService.getAllActiveIdsInStorage(); - expect(inStorage).toHaveLength(0); - - const inMemory = activeWorkflowRunner.allActiveInMemory(); - expect(inMemory).toHaveLength(0); - }); - - test('should start with one active workflow', async () => { - await createWorkflow({ active: true }, owner); - - await activeWorkflowRunner.init(); - - const inStorage = await activeWorkflowsService.getAllActiveIdsInStorage(); - expect(inStorage).toHaveLength(1); - - const inMemory = activeWorkflowRunner.allActiveInMemory(); - expect(inMemory).toHaveLength(1); - }); - - test('should start with multiple active workflows', async () => { - await createWorkflow({ active: true }, owner); - await createWorkflow({ active: true }, owner); - - await activeWorkflowRunner.init(); - - const inStorage = await activeWorkflowsService.getAllActiveIdsInStorage(); - expect(inStorage).toHaveLength(2); - - const inMemory = activeWorkflowRunner.allActiveInMemory(); - expect(inMemory).toHaveLength(2); - }); - - test('should pre-check that every workflow can be activated', async () => { - await createWorkflow({ active: true }, owner); - await createWorkflow({ active: true }, owner); - - const precheckSpy = jest - .spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated') - .mockReturnValue(true); - - await activeWorkflowRunner.init(); - - expect(precheckSpy).toHaveBeenCalledTimes(2); - }); -}); - -describe('removeAll()', () => { - test('should remove all active workflows from memory', async () => { - await createWorkflow({ active: true }, owner); - await createWorkflow({ active: true }, owner); - - await activeWorkflowRunner.init(); - await activeWorkflowRunner.removeAll(); - - const inMemory = activeWorkflowRunner.allActiveInMemory(); - expect(inMemory).toHaveLength(0); - }); -}); - -describe('remove()', () => { - test('should call `ActiveWorkflowRunner.clearWebhooks()`', async () => { - const workflow = await createWorkflow({ active: true }, owner); - const clearWebhooksSpy = jest.spyOn(activeWorkflowRunner, 'clearWebhooks'); - - await activeWorkflowRunner.init(); - await activeWorkflowRunner.remove(workflow.id); - - expect(clearWebhooksSpy).toHaveBeenCalledTimes(1); - }); -}); - -describe('isActive()', () => { - test('should return `true` for active workflow in storage', async () => { - const workflow = await createWorkflow({ active: true }, owner); - - await activeWorkflowRunner.init(); - - const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id); - await expect(isActiveInStorage).resolves.toBe(true); - }); - - test('should return `false` for inactive workflow in storage', async () => { - const workflow = await createWorkflow({ active: false }, owner); - - await activeWorkflowRunner.init(); - - const isActiveInStorage = activeWorkflowRunner.isActive(workflow.id); - await expect(isActiveInStorage).resolves.toBe(false); - }); -}); - -describe('executeErrorWorkflow()', () => { - test('should call `WorkflowExecuteAdditionalData.executeErrorWorkflow()`', async () => { - const workflow = await createWorkflow({ active: true }, owner); - const [node] = workflow.nodes; - const error = new NodeOperationError(node, 'Fake error message'); - const executeSpy = jest.spyOn(AdditionalData, 'executeErrorWorkflow'); - - await activeWorkflowRunner.init(); - activeWorkflowRunner.executeErrorWorkflow(error, workflow, 'trigger'); - - expect(executeSpy).toHaveBeenCalledTimes(1); - }); - - test('should be called on failure to activate due to 401', async () => { - const storedWorkflow = await createWorkflow({ active: true }, owner); - const [node] = storedWorkflow.nodes; - const executeSpy = jest.spyOn(activeWorkflowRunner, 'executeErrorWorkflow'); - - jest.spyOn(activeWorkflowRunner, 'add').mockImplementation(() => { - throw new NodeApiError(node, { - httpCode: '401', - message: 'Authorization failed - please check your credentials', - }); - }); - - await activeWorkflowRunner.init(); - - expect(executeSpy).toHaveBeenCalledTimes(1); - const [error, workflow] = executeSpy.mock.calls[0]; - expect(error.message).toContain('Authorization'); - expect(workflow.id).toBe(storedWorkflow.id); - }); -}); - -describe('addWebhooks()', () => { - test('should call `WebhookService.storeWebhook()`', async () => { - const mockWebhook = { path: 'fake-path' } as unknown as IWebhookData; - const mockWebhookEntity = { webhookPath: 'fake-path' } as unknown as WebhookEntity; - - jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([mockWebhook]); - webhookService.createWebhook.mockReturnValue(mockWebhookEntity); - - const additionalData = await AdditionalData.getBase('fake-user-id'); - - const dbWorkflow = await createWorkflow({ active: true }, owner); - - const workflow = new Workflow({ - id: dbWorkflow.id, - name: dbWorkflow.name, - nodes: dbWorkflow.nodes, - connections: dbWorkflow.connections, - active: dbWorkflow.active, - nodeTypes: Container.get(NodeTypes), - staticData: dbWorkflow.staticData, - settings: dbWorkflow.settings, - }); - - const [node] = dbWorkflow.nodes; - - jest.spyOn(Workflow.prototype, 'getNode').mockReturnValue(node); - jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true); - jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined); - - await activeWorkflowRunner.addWebhooks(workflow, additionalData, 'trigger', 'init'); - - expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1); - }); -}); diff --git a/packages/cli/test/integration/active-workflow-runner.test.ts b/packages/cli/test/integration/active-workflow-runner.test.ts new file mode 100644 index 0000000000..5a0a35bf5f --- /dev/null +++ b/packages/cli/test/integration/active-workflow-runner.test.ts @@ -0,0 +1,277 @@ +import { Container } from 'typedi'; +import { mock } from 'jest-mock-extended'; +import { NodeApiError, NodeOperationError, Workflow } from 'n8n-workflow'; +import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow'; + +import { ActiveExecutions } from '@/ActiveExecutions'; +import { ActiveWorkflowRunner } from '@/ActiveWorkflowRunner'; +import { ExternalHooks } from '@/ExternalHooks'; +import { Push } from '@/push'; +import { SecretsHelper } from '@/SecretsHelpers'; +import { WebhookService } from '@/services/webhook.service'; +import * as WebhookHelpers from '@/WebhookHelpers'; +import * as AdditionalData from '@/WorkflowExecuteAdditionalData'; +import type { WebhookEntity } from '@db/entities/WebhookEntity'; +import { NodeTypes } from '@/NodeTypes'; +import { ExecutionService } from '@/executions/execution.service'; +import { WorkflowService } from '@/workflows/workflow.service'; + +import { mockInstance } from '../shared/mocking'; +import * as testDb from './shared/testDb'; +import { createOwner } from './shared/db/users'; +import { createWorkflow } from './shared/db/workflows'; +import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; +import type { WorkflowEntity } from '@/databases/entities/WorkflowEntity'; + +mockInstance(ActiveExecutions); +mockInstance(Push); +mockInstance(SecretsHelper); +mockInstance(ExecutionService); +mockInstance(WorkflowService); + +const loader = mockInstance(LoadNodesAndCredentials); + +Object.assign(loader.loadedNodes, { + 'n8n-nodes-base.scheduleTrigger': { + type: { + description: { + displayName: 'Schedule Trigger', + name: 'scheduleTrigger', + properties: [], + }, + trigger: async () => {}, + }, + }, +}); + +const webhookService = mockInstance(WebhookService); +const externalHooks = mockInstance(ExternalHooks); + +let runner: ActiveWorkflowRunner; + +let createActiveWorkflow: () => Promise; +let createInactiveWorkflow: () => Promise; + +beforeAll(async () => { + await testDb.init(); + + runner = Container.get(ActiveWorkflowRunner); + + const owner = await createOwner(); + createActiveWorkflow = async () => await createWorkflow({ active: true }, owner); + createInactiveWorkflow = async () => await createWorkflow({ active: false }, owner); +}); + +afterEach(async () => { + await testDb.truncate(['Workflow', 'Webhook']); + await runner.removeAll(); + jest.restoreAllMocks(); +}); + +afterAll(async () => { + await testDb.terminate(); +}); + +describe('init()', () => { + it('should load workflows into memory', async () => { + await runner.init(); + + expect(runner.allActiveInMemory()).toHaveLength(0); + + await createActiveWorkflow(); + await runner.init(); + + expect(runner.allActiveInMemory()).toHaveLength(1); + }); + + it('should call external hook', async () => { + await runner.init(); + + const [hook, arg] = externalHooks.run.mock.calls[0]; + + expect(hook).toBe('activeWorkflows.initialized'); + expect(arg).toBeEmptyArray(); + }); + + it('should check that workflow can be activated', async () => { + await Promise.all([createActiveWorkflow(), createActiveWorkflow()]); + + const checkSpy = jest + .spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated') + .mockReturnValue(true); + + await runner.init(); + + expect(checkSpy).toHaveBeenCalledTimes(2); + }); +}); + +describe('isActive()', () => { + it('should return `true` for active workflow in storage', async () => { + const dbWorkflow = await createActiveWorkflow(); + + await runner.init(); + + await expect(runner.isActive(dbWorkflow.id)).resolves.toBe(true); + }); + + it('should return `false` for inactive workflow in storage', async () => { + const dbWorkflow = await createInactiveWorkflow(); + + await runner.init(); + + await expect(runner.isActive(dbWorkflow.id)).resolves.toBe(false); + }); +}); + +describe('add()', () => { + describe('in single-main mode', () => { + test.each(['activate', 'update'])( + "should add webhooks, triggers and pollers for workflow in '%s' activation mode", + async (mode: WorkflowActivateMode) => { + await runner.init(); + + const dbWorkflow = await createActiveWorkflow(); + const addWebhooksSpy = jest.spyOn(runner, 'addWebhooks'); + const addTriggersAndPollersSpy = jest.spyOn(runner, 'addTriggersAndPollers'); + + await runner.add(dbWorkflow.id, mode); + + const [argWorkflow] = addWebhooksSpy.mock.calls[0]; + const [_, _argWorkflow] = addTriggersAndPollersSpy.mock.calls[0]; + + expect(addWebhooksSpy).toHaveBeenCalledTimes(1); + expect(addTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + + if (!(argWorkflow instanceof Workflow)) fail(); + if (!(_argWorkflow instanceof Workflow)) fail(); + + expect(argWorkflow.id).toBe(dbWorkflow.id); + expect(_argWorkflow.id).toBe(dbWorkflow.id); + }, + ); + }); +}); + +describe('removeAll()', () => { + it('should remove all active workflows from memory', async () => { + await createActiveWorkflow(); + await createActiveWorkflow(); + + await runner.init(); + await runner.removeAll(); + + expect(runner.allActiveInMemory()).toHaveLength(0); + }); +}); + +describe('remove()', () => { + describe('in single-main mode', () => { + it('should remove all webhooks of a workflow from database', async () => { + const dbWorkflow = await createActiveWorkflow(); + + await runner.init(); + await runner.remove(dbWorkflow.id); + + expect(webhookService.deleteWorkflowWebhooks).toHaveBeenCalledTimes(1); + }); + + it('should remove all webhooks of a workflow from external service', async () => { + const dbWorkflow = await createActiveWorkflow(); + const deleteWebhookSpy = jest.spyOn(Workflow.prototype, 'deleteWebhook'); + jest + .spyOn(WebhookHelpers, 'getWorkflowWebhooks') + .mockReturnValue([mock({ path: 'some-path' })]); + + await runner.init(); + await runner.remove(dbWorkflow.id); + + expect(deleteWebhookSpy).toHaveBeenCalledTimes(1); + }); + + it('should stop running triggers and pollers', async () => { + const dbWorkflow = await createActiveWorkflow(); + const removeTriggersAndPollersSpy = jest.spyOn(runner, 'removeWorkflowTriggersAndPollers'); + + await runner.init(); + await runner.remove(dbWorkflow.id); + + expect(removeTriggersAndPollersSpy).toHaveBeenCalledTimes(1); + }); + }); +}); + +describe('executeErrorWorkflow()', () => { + it('should delegate to `WorkflowExecuteAdditionalData`', async () => { + const dbWorkflow = await createActiveWorkflow(); + const [node] = dbWorkflow.nodes; + + const executeSpy = jest.spyOn(AdditionalData, 'executeErrorWorkflow'); + + await runner.init(); + + runner.executeErrorWorkflow( + new NodeOperationError(node, 'Something went wrong'), + dbWorkflow, + 'trigger', + ); + + expect(executeSpy).toHaveBeenCalledTimes(1); + }); + + it('should be called on failure to activate due to 401', async () => { + const dbWorkflow = await createActiveWorkflow(); + const [node] = dbWorkflow.nodes; + const executeSpy = jest.spyOn(runner, 'executeErrorWorkflow'); + + jest.spyOn(runner, 'add').mockImplementation(() => { + throw new NodeApiError(node, { + httpCode: '401', + message: 'Authorization failed - please check your credentials', + }); + }); + + await runner.init(); + + expect(executeSpy).toHaveBeenCalledTimes(1); + const [error, _dbWorkflow] = executeSpy.mock.calls[0]; + expect(error.message).toContain('Authorization'); + expect(_dbWorkflow.id).toBe(dbWorkflow.id); + }); +}); + +describe('addWebhooks()', () => { + it('should call `WebhookService.storeWebhook()`', async () => { + const webhook = mock({ path: 'some-path' }); + const webhookEntity = mock({ webhookPath: 'some-path' }); + + jest.spyOn(WebhookHelpers, 'getWorkflowWebhooks').mockReturnValue([webhook]); + + webhookService.createWebhook.mockReturnValue(webhookEntity); + + const additionalData = await AdditionalData.getBase('some-user-id'); + + const dbWorkflow = await createActiveWorkflow(); + + const workflow = new Workflow({ + id: dbWorkflow.id, + name: dbWorkflow.name, + nodes: dbWorkflow.nodes, + connections: dbWorkflow.connections, + active: dbWorkflow.active, + nodeTypes: Container.get(NodeTypes), + staticData: dbWorkflow.staticData, + settings: dbWorkflow.settings, + }); + + const [node] = dbWorkflow.nodes; + + jest.spyOn(Workflow.prototype, 'getNode').mockReturnValue(node); + jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true); + jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined); + + await runner.addWebhooks(workflow, additionalData, 'trigger', 'init'); + + expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/cli/test/integration/shared/utils/index.ts b/packages/cli/test/integration/shared/utils/index.ts index e2c2225b8b..2558c461ac 100644 --- a/packages/cli/test/integration/shared/utils/index.ts +++ b/packages/cli/test/integration/shared/utils/index.ts @@ -18,7 +18,6 @@ import { LoadNodesAndCredentials } from '@/LoadNodesAndCredentials'; import { Push } from '@/push'; import { OrchestrationService } from '@/services/orchestration.service'; -import { mockNodeTypesData } from '../../../unit/Helpers'; import { mockInstance } from '../../../shared/mocking'; export { setupTestServer } from './testServer'; @@ -177,15 +176,3 @@ export function makeWorkflow(options?: { } export const MOCK_PINDATA = { Spotify: [{ json: { myKey: 'myValue' } }] }; - -export function setSchedulerAsLoadedNode() { - const nodesAndCredentials = mockInstance(LoadNodesAndCredentials); - - Object.assign(nodesAndCredentials, { - loadedNodes: mockNodeTypesData(['scheduleTrigger'], { - addTrigger: true, - }), - known: { nodes: {}, credentials: {} }, - types: { nodes: [], credentials: [] }, - }); -}