From 5a055ed5267996be99a6ee8c504c21d0a221ee3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Thu, 12 Dec 2024 13:54:44 +0100 Subject: [PATCH] refactor(core): Move execution engine code out of n8n-workflow (no-changelog) (#12147) --- .../trigger/ChatTrigger/ChatTrigger.node.ts | 13 +- .../__tests__/active-workflow-manager.test.ts | 125 ++++ .../load-nodes-and-credentials.test.ts | 177 ++++++ packages/cli/src/__tests__/node-types.test.ts | 5 +- packages/cli/src/active-workflow-manager.ts | 55 +- .../cli/src/load-nodes-and-credentials.ts | 100 +++- packages/cli/src/node-types.ts | 2 +- .../services/credentials-tester.service.ts | 11 +- .../dynamic-node-parameters.service.ts | 16 +- .../webhooks/__tests__/test-webhooks.test.ts | 24 +- .../webhooks/__tests__/waiting-forms.test.ts | 2 +- .../__tests__/waiting-webhooks.test.ts | 2 +- .../__tests__/webhook.service.test.ts | 184 +++++- packages/cli/src/webhooks/live-webhooks.ts | 10 +- packages/cli/src/webhooks/test-webhooks.ts | 7 +- packages/cli/src/webhooks/waiting-webhooks.ts | 23 +- packages/cli/src/webhooks/webhook-helpers.ts | 15 +- packages/cli/src/webhooks/webhook.service.ts | 229 ++++++- .../active-workflow-manager.test.ts | 83 +-- packages/core/src/ActiveWorkflows.ts | 7 +- packages/core/src/Constants.ts | 30 + packages/core/src/DirectoryLoader.ts | 32 +- packages/core/src/NodeExecuteFunctions.ts | 114 +--- .../{workflow => core}/src/RoutingNode.ts | 57 +- packages/core/src/TriggersAndPollers.ts | 116 ++++ packages/core/src/WorkflowExecute.ts | 300 +++++++++- packages/core/src/index.ts | 1 + .../core/test/NodeExecuteFunctions.test.ts | 1 - .../test/RoutingNode.test.ts | 164 +++-- packages/core/test/TriggersAndPollers.test.ts | 157 +++++ packages/core/test/WorkflowExecute.test.ts | 162 ++++- packages/core/test/helpers/constants.ts | 83 +++ packages/core/test/helpers/index.ts | 4 +- .../editor-ui/src/components/NodeSettings.vue | 2 +- .../src/composables/useNodeHelpers.test.ts | 33 + .../src/composables/useNodeHelpers.ts | 46 ++ packages/workflow/src/Constants.ts | 31 - packages/workflow/src/Interfaces.ts | 58 -- packages/workflow/src/NodeHelpers.ts | 291 +-------- packages/workflow/src/Workflow.ts | 565 +----------------- packages/workflow/src/index.ts | 1 - packages/workflow/test/Helpers.ts | 56 +- packages/workflow/test/NodeHelpers.test.ts | 203 ------- packages/workflow/test/Workflow.test.ts | 193 +----- 44 files changed, 1995 insertions(+), 1795 deletions(-) create mode 100644 packages/cli/src/__tests__/active-workflow-manager.test.ts rename packages/{workflow => core}/src/RoutingNode.ts (97%) create mode 100644 packages/core/src/TriggersAndPollers.ts rename packages/{workflow => core}/test/RoutingNode.test.ts (95%) create mode 100644 packages/core/test/TriggersAndPollers.test.ts diff --git a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts index 489b4fe28b..27fb1bcd35 100644 --- a/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/trigger/ChatTrigger/ChatTrigger.node.ts @@ -1,6 +1,6 @@ import type { BaseChatMemory } from '@langchain/community/memory/chat_memory'; import { pick } from 'lodash'; -import { Node, NodeConnectionType, commonCORSParameters } from 'n8n-workflow'; +import { Node, NodeConnectionType } from 'n8n-workflow'; import type { IDataObject, IWebhookFunctions, @@ -241,14 +241,19 @@ export class ChatTrigger extends Node { default: {}, options: [ // CORS parameters are only valid for when chat is used in hosted or webhook mode - ...commonCORSParameters.map((p) => ({ - ...p, + { + displayName: 'Allowed Origins (CORS)', + name: 'allowedOrigins', + type: 'string', + default: '*', + description: + 'Comma-separated list of URLs allowed for cross-origin non-preflight requests. Use * (default) to allow all origins.', displayOptions: { show: { '/mode': ['hostedChat', 'webhook'], }, }, - })), + }, { ...allowFileUploadsOption, displayOptions: { diff --git a/packages/cli/src/__tests__/active-workflow-manager.test.ts b/packages/cli/src/__tests__/active-workflow-manager.test.ts new file mode 100644 index 0000000000..a167f1e5a5 --- /dev/null +++ b/packages/cli/src/__tests__/active-workflow-manager.test.ts @@ -0,0 +1,125 @@ +import { mock } from 'jest-mock-extended'; +import type { InstanceSettings } from 'n8n-core'; +import type { + WorkflowParameters, + INode, + INodeType, + INodeTypeDescription, + WorkflowActivateMode, +} from 'n8n-workflow'; +import { Workflow } from 'n8n-workflow'; + +import { ActiveWorkflowManager } from '@/active-workflow-manager'; +import type { NodeTypes } from '@/node-types'; + +describe('ActiveWorkflowManager', () => { + let activeWorkflowManager: ActiveWorkflowManager; + const instanceSettings = mock(); + const nodeTypes = mock(); + + beforeEach(() => { + jest.clearAllMocks(); + activeWorkflowManager = new ActiveWorkflowManager( + mock(), + mock(), + mock(), + mock(), + mock(), + nodeTypes, + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + mock(), + instanceSettings, + mock(), + ); + }); + + describe('checkIfWorkflowCanBeActivated', () => { + const disabledNode = mock({ type: 'triggerNode', disabled: true }); + const unknownNode = mock({ type: 'unknownNode' }); + const noTriggersNode = mock({ type: 'noTriggersNode' }); + const pollNode = mock({ type: 'pollNode' }); + const triggerNode = mock({ type: 'triggerNode' }); + const webhookNode = mock({ type: 'webhookNode' }); + + nodeTypes.getByNameAndVersion.mockImplementation((type) => { + // TODO: getByNameAndVersion signature needs to be updated to allow returning undefined + if (type === 'unknownNode') return undefined as unknown as INodeType; + const partial: Partial = { + poll: undefined, + trigger: undefined, + webhook: undefined, + description: mock({ + properties: [], + }), + }; + if (type === 'pollNode') partial.poll = jest.fn(); + if (type === 'triggerNode') partial.trigger = jest.fn(); + if (type === 'webhookNode') partial.webhook = jest.fn(); + return mock(partial); + }); + + test.each([ + ['should skip disabled nodes', disabledNode, [], false], + ['should skip nodes marked as ignored', triggerNode, ['triggerNode'], false], + ['should skip unknown nodes', unknownNode, [], false], + ['should skip nodes with no trigger method', noTriggersNode, [], false], + ['should activate if poll method exists', pollNode, [], true], + ['should activate if trigger method exists', triggerNode, [], true], + ['should activate if webhook method exists', webhookNode, [], true], + ])('%s', async (_, node, ignoredNodes, expected) => { + const workflow = new Workflow(mock({ nodeTypes, nodes: [node] })); + const canBeActivated = activeWorkflowManager.checkIfWorkflowCanBeActivated( + workflow, + ignoredNodes, + ); + expect(canBeActivated).toBe(expected); + }); + }); + + describe('shouldAddWebhooks', () => { + describe('if leader', () => { + beforeAll(() => { + Object.assign(instanceSettings, { isLeader: true, isFollower: false }); + }); + + test('should return `true` for `init`', () => { + // ensure webhooks are populated on init: https://github.com/n8n-io/n8n/pull/8830 + const result = activeWorkflowManager.shouldAddWebhooks('init'); + expect(result).toBe(true); + }); + + test('should return `false` for `leadershipChange`', () => { + const result = activeWorkflowManager.shouldAddWebhooks('leadershipChange'); + expect(result).toBe(false); + }); + + test('should return `true` for `update` or `activate`', () => { + const modes = ['update', 'activate'] as WorkflowActivateMode[]; + for (const mode of modes) { + const result = activeWorkflowManager.shouldAddWebhooks(mode); + expect(result).toBe(true); + } + }); + }); + + describe('if follower', () => { + beforeAll(() => { + Object.assign(instanceSettings, { isLeader: false, isFollower: true }); + }); + + test('should return `false` for `update` or `activate`', () => { + const modes = ['update', 'activate'] as WorkflowActivateMode[]; + for (const mode of modes) { + const result = activeWorkflowManager.shouldAddWebhooks(mode); + expect(result).toBe(false); + } + }); + }); + }); +}); diff --git a/packages/cli/src/__tests__/load-nodes-and-credentials.test.ts b/packages/cli/src/__tests__/load-nodes-and-credentials.test.ts index ec8fd06977..75aa602301 100644 --- a/packages/cli/src/__tests__/load-nodes-and-credentials.test.ts +++ b/packages/cli/src/__tests__/load-nodes-and-credentials.test.ts @@ -1,5 +1,7 @@ import { mock } from 'jest-mock-extended'; import type { DirectoryLoader } from 'n8n-core'; +import type { INodeProperties, INodeTypeDescription } from 'n8n-workflow'; +import { NodeConnectionType } from 'n8n-workflow'; import { LoadNodesAndCredentials } from '../load-nodes-and-credentials'; @@ -34,4 +36,179 @@ describe('LoadNodesAndCredentials', () => { expect(result).toBeUndefined(); }); }); + + describe('convertNodeToAiTool', () => { + const instance = new LoadNodesAndCredentials(mock(), mock(), mock(), mock()); + + let fullNodeWrapper: { description: INodeTypeDescription }; + + beforeEach(() => { + fullNodeWrapper = { + description: { + displayName: 'Test Node', + name: 'testNode', + group: ['test'], + description: 'A test node', + version: 1, + defaults: {}, + inputs: [NodeConnectionType.Main], + outputs: [NodeConnectionType.Main], + properties: [], + }, + }; + }); + + it('should modify the name and displayName correctly', () => { + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.name).toBe('testNodeTool'); + expect(result.description.displayName).toBe('Test Node Tool'); + }); + + it('should update inputs and outputs', () => { + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.inputs).toEqual([]); + expect(result.description.outputs).toEqual([NodeConnectionType.AiTool]); + }); + + it('should remove the usableAsTool property', () => { + fullNodeWrapper.description.usableAsTool = true; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.usableAsTool).toBeUndefined(); + }); + + it("should add toolDescription property if it doesn't exist", () => { + const result = instance.convertNodeToAiTool(fullNodeWrapper); + const toolDescriptionProp = result.description.properties.find( + (prop) => prop.name === 'toolDescription', + ); + expect(toolDescriptionProp).toBeDefined(); + expect(toolDescriptionProp?.type).toBe('string'); + expect(toolDescriptionProp?.default).toBe(fullNodeWrapper.description.description); + }); + + it('should set codex categories correctly', () => { + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.codex).toEqual({ + categories: ['AI'], + subcategories: { + AI: ['Tools'], + Tools: ['Other Tools'], + }, + resources: {}, + }); + }); + + it('should preserve existing properties', () => { + const existingProp: INodeProperties = { + displayName: 'Existing Prop', + name: 'existingProp', + type: 'string', + default: 'test', + }; + fullNodeWrapper.description.properties = [existingProp]; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.properties).toHaveLength(3); // Existing prop + toolDescription + notice + expect(result.description.properties).toContainEqual(existingProp); + }); + + it('should handle nodes with resource property', () => { + const resourceProp: INodeProperties = { + displayName: 'Resource', + name: 'resource', + type: 'options', + options: [{ name: 'User', value: 'user' }], + default: 'user', + }; + fullNodeWrapper.description.properties = [resourceProp]; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.properties[1].name).toBe('descriptionType'); + expect(result.description.properties[2].name).toBe('toolDescription'); + expect(result.description.properties[3]).toEqual(resourceProp); + }); + + it('should handle nodes with operation property', () => { + const operationProp: INodeProperties = { + displayName: 'Operation', + name: 'operation', + type: 'options', + options: [{ name: 'Create', value: 'create' }], + default: 'create', + }; + fullNodeWrapper.description.properties = [operationProp]; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.properties[1].name).toBe('descriptionType'); + expect(result.description.properties[2].name).toBe('toolDescription'); + expect(result.description.properties[3]).toEqual(operationProp); + }); + + it('should handle nodes with both resource and operation properties', () => { + const resourceProp: INodeProperties = { + displayName: 'Resource', + name: 'resource', + type: 'options', + options: [{ name: 'User', value: 'user' }], + default: 'user', + }; + const operationProp: INodeProperties = { + displayName: 'Operation', + name: 'operation', + type: 'options', + options: [{ name: 'Create', value: 'create' }], + default: 'create', + }; + fullNodeWrapper.description.properties = [resourceProp, operationProp]; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.properties[1].name).toBe('descriptionType'); + expect(result.description.properties[2].name).toBe('toolDescription'); + expect(result.description.properties[3]).toEqual(resourceProp); + expect(result.description.properties[4]).toEqual(operationProp); + }); + + it('should handle nodes with empty properties', () => { + fullNodeWrapper.description.properties = []; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.properties).toHaveLength(2); + expect(result.description.properties[1].name).toBe('toolDescription'); + }); + + it('should handle nodes with existing codex property', () => { + fullNodeWrapper.description.codex = { + categories: ['Existing'], + subcategories: { + Existing: ['Category'], + }, + resources: { + primaryDocumentation: [{ url: 'https://example.com' }], + }, + }; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.codex).toEqual({ + categories: ['AI'], + subcategories: { + AI: ['Tools'], + Tools: ['Other Tools'], + }, + resources: { + primaryDocumentation: [{ url: 'https://example.com' }], + }, + }); + }); + + it('should handle nodes with very long names', () => { + fullNodeWrapper.description.name = 'veryLongNodeNameThatExceedsNormalLimits'.repeat(10); + fullNodeWrapper.description.displayName = + 'Very Long Node Name That Exceeds Normal Limits'.repeat(10); + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.name.endsWith('Tool')).toBe(true); + expect(result.description.displayName.endsWith('Tool')).toBe(true); + }); + + it('should handle nodes with special characters in name and displayName', () => { + fullNodeWrapper.description.name = 'special@#$%Node'; + fullNodeWrapper.description.displayName = 'Special @#$% Node'; + const result = instance.convertNodeToAiTool(fullNodeWrapper); + expect(result.description.name).toBe('special@#$%NodeTool'); + expect(result.description.displayName).toBe('Special @#$% Node Tool'); + }); + }); }); diff --git a/packages/cli/src/__tests__/node-types.test.ts b/packages/cli/src/__tests__/node-types.test.ts index 6b1984a79f..78d0c5e18a 100644 --- a/packages/cli/src/__tests__/node-types.test.ts +++ b/packages/cli/src/__tests__/node-types.test.ts @@ -7,7 +7,7 @@ import type { INodeTypeDescription, } from 'n8n-workflow'; -import type { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; +import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { NodeTypes } from '@/node-types'; describe('NodeTypes', () => { @@ -104,6 +104,9 @@ describe('NodeTypes', () => { }); it('should return the tool node-type when requested as tool', () => { + // @ts-expect-error don't mock convertNodeToAiTool for now + loadNodesAndCredentials.convertNodeToAiTool = + LoadNodesAndCredentials.prototype.convertNodeToAiTool; const result = nodeTypes.getByNameAndVersion('n8n-nodes-base.testNodeTool'); expect(result).not.toEqual(toolSupportingNode); expect(result.description.name).toEqual('n8n-nodes-base.testNodeTool'); diff --git a/packages/cli/src/active-workflow-manager.ts b/packages/cli/src/active-workflow-manager.ts index 7e3d045fe4..6ef3753af7 100644 --- a/packages/cli/src/active-workflow-manager.ts +++ b/packages/cli/src/active-workflow-manager.ts @@ -1,10 +1,8 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access */ - import { ActiveWorkflows, ErrorReporter, InstanceSettings, - NodeExecuteFunctions, PollContext, TriggerContext, } from 'n8n-core'; @@ -186,12 +184,7 @@ export class ActiveWorkflowManager { try { // TODO: this should happen in a transaction, that way we don't need to manually remove this in `catch` await this.webhookService.storeWebhook(webhook); - await workflow.createWebhookIfNotExists( - webhookData, - NodeExecuteFunctions, - mode, - activation, - ); + await this.webhookService.createWebhookIfNotExists(workflow, webhookData, mode, activation); } catch (error) { if (activation === 'init' && error.name === 'QueryFailedError') { // n8n does not remove the registered webhooks on exit. @@ -261,7 +254,7 @@ export class ActiveWorkflowManager { const webhooks = WebhookHelpers.getWorkflowWebhooks(workflow, additionalData, undefined, true); for (const webhookData of webhooks) { - await workflow.deleteWebhook(webhookData, NodeExecuteFunctions, mode, 'update'); + await this.webhookService.deleteWebhook(workflow, webhookData, mode, 'update'); } await this.workflowStaticDataService.saveStaticData(workflow); @@ -557,7 +550,7 @@ export class ActiveWorkflowManager { settings: dbWorkflow.settings, }); - const canBeActivated = workflow.checkIfWorkflowCanBeActivated(STARTING_NODES); + const canBeActivated = this.checkIfWorkflowCanBeActivated(workflow, STARTING_NODES); if (!canBeActivated) { throw new WorkflowActivationError( @@ -601,6 +594,48 @@ export class ActiveWorkflowManager { return shouldDisplayActivationMessage; } + /** + * A workflow can only be activated if it has a node which has either triggers + * or webhooks defined. + * + * @param {string[]} [ignoreNodeTypes] Node-types to ignore in the check + */ + checkIfWorkflowCanBeActivated(workflow: Workflow, ignoreNodeTypes?: string[]): boolean { + let node: INode; + let nodeType: INodeType | undefined; + + for (const nodeName of Object.keys(workflow.nodes)) { + node = workflow.nodes[nodeName]; + + if (node.disabled === true) { + // Deactivated nodes can not trigger a run so ignore + continue; + } + + if (ignoreNodeTypes !== undefined && ignoreNodeTypes.includes(node.type)) { + continue; + } + + nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + + if (nodeType === undefined) { + // Type is not known so check is not possible + continue; + } + + if ( + nodeType.poll !== undefined || + nodeType.trigger !== undefined || + nodeType.webhook !== undefined + ) { + // Is a trigger node. So workflow can be activated. + return true; + } + } + + return false; + } + /** * Count all triggers in the workflow, excluding Manual Trigger. */ diff --git a/packages/cli/src/load-nodes-and-credentials.ts b/packages/cli/src/load-nodes-and-credentials.ts index 5a46d6c70d..f6f66d024b 100644 --- a/packages/cli/src/load-nodes-and-credentials.ts +++ b/packages/cli/src/load-nodes-and-credentials.ts @@ -22,8 +22,9 @@ import type { ICredentialType, INodeType, IVersionedNodeType, + INodeProperties, } from 'n8n-workflow'; -import { NodeHelpers, ApplicationError } from 'n8n-workflow'; +import { ApplicationError, NodeConnectionType } from 'n8n-workflow'; import path from 'path'; import picocolors from 'picocolors'; import { Container, Service } from 'typedi'; @@ -293,7 +294,7 @@ export class LoadNodesAndCredentials { for (const usableNode of usableNodes) { const description: INodeTypeBaseDescription | INodeTypeDescription = structuredClone(usableNode); - const wrapped = NodeHelpers.convertNodeToAiTool({ description }).description; + const wrapped = this.convertNodeToAiTool({ description }).description; this.types.nodes.push(wrapped); this.known.nodes[wrapped.name] = structuredClone(this.known.nodes[usableNode.name]); @@ -396,6 +397,101 @@ export class LoadNodesAndCredentials { throw new UnrecognizedCredentialTypeError(credentialType); } + /** + * Modifies the description of the passed in object, such that it can be used + * as an AI Agent Tool. + * Returns the modified item (not copied) + */ + convertNodeToAiTool< + T extends object & { description: INodeTypeDescription | INodeTypeBaseDescription }, + >(item: T): T { + // quick helper function for type-guard down below + function isFullDescription(obj: unknown): obj is INodeTypeDescription { + return typeof obj === 'object' && obj !== null && 'properties' in obj; + } + + if (isFullDescription(item.description)) { + item.description.name += 'Tool'; + item.description.inputs = []; + item.description.outputs = [NodeConnectionType.AiTool]; + item.description.displayName += ' Tool'; + delete item.description.usableAsTool; + + const hasResource = item.description.properties.some((prop) => prop.name === 'resource'); + const hasOperation = item.description.properties.some((prop) => prop.name === 'operation'); + + if (!item.description.properties.map((prop) => prop.name).includes('toolDescription')) { + const descriptionType: INodeProperties = { + displayName: 'Tool Description', + name: 'descriptionType', + type: 'options', + noDataExpression: true, + options: [ + { + name: 'Set Automatically', + value: 'auto', + description: 'Automatically set based on resource and operation', + }, + { + name: 'Set Manually', + value: 'manual', + description: 'Manually set the description', + }, + ], + default: 'auto', + }; + + const descProp: INodeProperties = { + displayName: 'Description', + name: 'toolDescription', + type: 'string', + default: item.description.description, + required: true, + typeOptions: { rows: 2 }, + description: + 'Explain to the LLM what this tool does, a good, specific description would allow LLMs to produce expected results much more often', + placeholder: `e.g. ${item.description.description}`, + }; + + const noticeProp: INodeProperties = { + displayName: + "Use the expression {{ $fromAI('placeholder_name') }} for any data to be filled by the model", + name: 'notice', + type: 'notice', + default: '', + }; + + item.description.properties.unshift(descProp); + + // If node has resource or operation we can determine pre-populate tool description based on it + // so we add the descriptionType property as the first property + if (hasResource || hasOperation) { + item.description.properties.unshift(descriptionType); + + descProp.displayOptions = { + show: { + descriptionType: ['manual'], + }, + }; + } + + item.description.properties.unshift(noticeProp); + } + } + + const resources = item.description.codex?.resources ?? {}; + + item.description.codex = { + categories: ['AI'], + subcategories: { + AI: ['Tools'], + Tools: ['Other Tools'], + }, + resources, + }; + return item; + } + async setupHotReload() { const { default: debounce } = await import('lodash/debounce'); // eslint-disable-next-line import/no-extraneous-dependencies diff --git a/packages/cli/src/node-types.ts b/packages/cli/src/node-types.ts index 72584ac502..1837cd7bcc 100644 --- a/packages/cli/src/node-types.ts +++ b/packages/cli/src/node-types.ts @@ -59,7 +59,7 @@ export class NodeTypes implements INodeTypes { const clonedNode = Object.create(versionedNodeType, { description: { value: clonedDescription }, }) as INodeType; - const tool = NodeHelpers.convertNodeToAiTool(clonedNode); + const tool = this.loadNodesAndCredentials.convertNodeToAiTool(clonedNode); loadedNodes[nodeType + 'Tool'] = { sourcePath: '', type: tool }; return tool; } diff --git a/packages/cli/src/services/credentials-tester.service.ts b/packages/cli/src/services/credentials-tester.service.ts index 4c3b574d6f..4a999d6541 100644 --- a/packages/cli/src/services/credentials-tester.service.ts +++ b/packages/cli/src/services/credentials-tester.service.ts @@ -4,7 +4,7 @@ /* eslint-disable @typescript-eslint/no-unsafe-return */ /* eslint-disable @typescript-eslint/no-unsafe-call */ import get from 'lodash/get'; -import { ErrorReporter, NodeExecuteFunctions } from 'n8n-core'; +import { ErrorReporter, NodeExecuteFunctions, RoutingNode } from 'n8n-core'; import type { ICredentialsDecrypted, ICredentialTestFunction, @@ -23,13 +23,7 @@ import type { ICredentialTestFunctions, IDataObject, } from 'n8n-workflow'; -import { - VersionedNodeType, - NodeHelpers, - RoutingNode, - Workflow, - ApplicationError, -} from 'n8n-workflow'; +import { VersionedNodeType, NodeHelpers, Workflow, ApplicationError } from 'n8n-workflow'; import { Service } from 'typedi'; import { CredentialTypes } from '@/credential-types'; @@ -312,7 +306,6 @@ export class CredentialsTester { runIndex, nodeTypeCopy, { node, data: {}, source: null }, - NodeExecuteFunctions, credentialsDecrypted, ); } catch (error) { diff --git a/packages/cli/src/services/dynamic-node-parameters.service.ts b/packages/cli/src/services/dynamic-node-parameters.service.ts index eb6ecc5f67..a20d63b5fa 100644 --- a/packages/cli/src/services/dynamic-node-parameters.service.ts +++ b/packages/cli/src/services/dynamic-node-parameters.service.ts @@ -1,4 +1,4 @@ -import { LoadOptionsContext, NodeExecuteFunctions } from 'n8n-core'; +import { LoadOptionsContext, RoutingNode } from 'n8n-core'; import type { ILoadOptions, ILoadOptionsFunctions, @@ -18,7 +18,7 @@ import type { NodeParameterValueType, IDataObject, } from 'n8n-workflow'; -import { Workflow, RoutingNode, ApplicationError } from 'n8n-workflow'; +import { Workflow, ApplicationError } from 'n8n-workflow'; import { Service } from 'typedi'; import { NodeTypes } from '@/node-types'; @@ -105,13 +105,11 @@ export class DynamicNodeParametersService { main: [[{ json: {} }]], }; - const optionsData = await routingNode.runNode( - inputData, - runIndex, - tempNode, - { node, source: null, data: {} }, - NodeExecuteFunctions, - ); + const optionsData = await routingNode.runNode(inputData, runIndex, tempNode, { + node, + source: null, + data: {}, + }); if (optionsData?.length === 0) { return []; diff --git a/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts b/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts index 50f5bc2f12..a07d4fc0fd 100644 --- a/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts +++ b/packages/cli/src/webhooks/__tests__/test-webhooks.test.ts @@ -18,6 +18,7 @@ import type { } from '@/webhooks/test-webhook-registrations.service'; import { TestWebhooks } from '@/webhooks/test-webhooks'; import * as WebhookHelpers from '@/webhooks/webhook-helpers'; +import type { WebhookService } from '@/webhooks/webhook.service'; import type { WebhookRequest } from '@/webhooks/webhook.types'; import * as AdditionalData from '@/workflow-execute-additional-data'; @@ -38,13 +39,20 @@ const webhook = mock({ userId, }); -const registrations = mock(); - -let testWebhooks: TestWebhooks; - describe('TestWebhooks', () => { + const registrations = mock(); + const webhookService = mock(); + + const testWebhooks = new TestWebhooks( + mock(), + mock(), + registrations, + mock(), + mock(), + webhookService, + ); + beforeAll(() => { - testWebhooks = new TestWebhooks(mock(), mock(), registrations, mock(), mock()); jest.useFakeTimers(); }); @@ -68,7 +76,7 @@ describe('TestWebhooks', () => { const needsWebhook = await testWebhooks.needsWebhook(args); const [registerOrder] = registrations.register.mock.invocationCallOrder; - const [createOrder] = workflow.createWebhookIfNotExists.mock.invocationCallOrder; + const [createOrder] = webhookService.createWebhookIfNotExists.mock.invocationCallOrder; expect(registerOrder).toBeLessThan(createOrder); expect(needsWebhook).toBe(true); @@ -132,11 +140,11 @@ describe('TestWebhooks', () => { // ASSERT const [registerOrder] = registrations.register.mock.invocationCallOrder; - const [createOrder] = workflow.createWebhookIfNotExists.mock.invocationCallOrder; + const [createOrder] = webhookService.createWebhookIfNotExists.mock.invocationCallOrder; expect(registerOrder).toBeLessThan(createOrder); expect(registrations.register.mock.calls[0][0].webhook.node).toBe(webhook2.node); - expect(workflow.createWebhookIfNotExists.mock.calls[0][0].node).toBe(webhook2.node); + expect(webhookService.createWebhookIfNotExists.mock.calls[0][1].node).toBe(webhook2.node); expect(needsWebhook).toBe(true); }); }); diff --git a/packages/cli/src/webhooks/__tests__/waiting-forms.test.ts b/packages/cli/src/webhooks/__tests__/waiting-forms.test.ts index bec6f95d7f..f342095b77 100644 --- a/packages/cli/src/webhooks/__tests__/waiting-forms.test.ts +++ b/packages/cli/src/webhooks/__tests__/waiting-forms.test.ts @@ -6,7 +6,7 @@ import { WaitingForms } from '@/webhooks/waiting-forms'; describe('WaitingForms', () => { const executionRepository = mock(); - const waitingWebhooks = new WaitingForms(mock(), mock(), executionRepository); + const waitingWebhooks = new WaitingForms(mock(), mock(), executionRepository, mock()); beforeEach(() => { jest.restoreAllMocks(); diff --git a/packages/cli/src/webhooks/__tests__/waiting-webhooks.test.ts b/packages/cli/src/webhooks/__tests__/waiting-webhooks.test.ts index 892d87e773..72fe654c55 100644 --- a/packages/cli/src/webhooks/__tests__/waiting-webhooks.test.ts +++ b/packages/cli/src/webhooks/__tests__/waiting-webhooks.test.ts @@ -10,7 +10,7 @@ import type { WaitingWebhookRequest } from '@/webhooks/webhook.types'; describe('WaitingWebhooks', () => { const executionRepository = mock(); - const waitingWebhooks = new WaitingWebhooks(mock(), mock(), executionRepository); + const waitingWebhooks = new WaitingWebhooks(mock(), mock(), executionRepository, mock()); beforeEach(() => { jest.restoreAllMocks(); diff --git a/packages/cli/src/webhooks/__tests__/webhook.service.test.ts b/packages/cli/src/webhooks/__tests__/webhook.service.test.ts index 534c36bca0..46ffb82dc6 100644 --- a/packages/cli/src/webhooks/__tests__/webhook.service.test.ts +++ b/packages/cli/src/webhooks/__tests__/webhook.service.test.ts @@ -1,11 +1,14 @@ +import { mock } from 'jest-mock-extended'; +import type { INode, INodeType, IWebhookData, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; +import { Workflow } from 'n8n-workflow'; import { v4 as uuid } from 'uuid'; import config from '@/config'; import { WebhookEntity } from '@/databases/entities/webhook-entity'; -import { WebhookRepository } from '@/databases/repositories/webhook.repository'; -import { CacheService } from '@/services/cache/cache.service'; +import type { WebhookRepository } from '@/databases/repositories/webhook.repository'; +import type { NodeTypes } from '@/node-types'; +import type { CacheService } from '@/services/cache/cache.service'; import { WebhookService } from '@/webhooks/webhook.service'; -import { mockInstance } from '@test/mocking'; const createWebhook = (method: string, path: string, webhookId?: string, pathSegments?: number) => Object.assign(new WebhookEntity(), { @@ -16,9 +19,11 @@ const createWebhook = (method: string, path: string, webhookId?: string, pathSeg }) as WebhookEntity; describe('WebhookService', () => { - const webhookRepository = mockInstance(WebhookRepository); - const cacheService = mockInstance(CacheService); - const webhookService = new WebhookService(webhookRepository, cacheService); + const webhookRepository = mock(); + const cacheService = mock(); + const nodeTypes = mock(); + const webhookService = new WebhookService(mock(), webhookRepository, cacheService, nodeTypes); + const additionalData = mock(); beforeEach(() => { config.load(config.default); @@ -188,4 +193,171 @@ describe('WebhookService', () => { expect(webhookRepository.upsert).toHaveBeenCalledWith(mockWebhook, ['method', 'webhookPath']); }); }); + + describe('getNodeWebhooks()', () => { + const workflow = new Workflow({ + id: 'test-workflow', + nodes: [], + connections: {}, + active: true, + nodeTypes, + }); + + test('should return empty array if node is disabled', async () => { + const node = { disabled: true } as INode; + + const webhooks = webhookService.getNodeWebhooks(workflow, node, additionalData); + + expect(webhooks).toEqual([]); + }); + + test('should return webhooks for node with webhook definitions', async () => { + const node = { + name: 'Webhook', + type: 'n8n-nodes-base.webhook', + disabled: false, + } as INode; + + const nodeType = { + description: { + webhooks: [ + { + name: 'default', + httpMethod: 'GET', + path: '/webhook', + isFullPath: false, + restartWebhook: false, + }, + ], + }, + } as INodeType; + + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + + const webhooks = webhookService.getNodeWebhooks(workflow, node, additionalData); + + expect(webhooks).toHaveLength(1); + expect(webhooks[0]).toMatchObject({ + httpMethod: 'GET', + node: 'Webhook', + workflowId: 'test-workflow', + }); + }); + }); + + describe('createWebhookIfNotExists()', () => { + const workflow = new Workflow({ + id: 'test-workflow', + nodes: [ + mock({ + name: 'Webhook', + type: 'n8n-nodes-base.webhook', + typeVersion: 1, + parameters: {}, + }), + ], + connections: {}, + active: false, + nodeTypes, + }); + + const webhookData = mock({ + node: 'Webhook', + webhookDescription: { + name: 'default', + httpMethod: 'GET', + path: '/webhook', + }, + }); + + const defaultWebhookMethods = { + checkExists: jest.fn(), + create: jest.fn(), + }; + + const nodeType = mock({ + webhookMethods: { default: defaultWebhookMethods }, + }); + + test('should create webhook if it does not exist', async () => { + defaultWebhookMethods.checkExists.mockResolvedValue(false); + defaultWebhookMethods.create.mockResolvedValue(true); + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + + await webhookService.createWebhookIfNotExists(workflow, webhookData, 'trigger', 'init'); + + expect(defaultWebhookMethods.checkExists).toHaveBeenCalled(); + expect(defaultWebhookMethods.create).toHaveBeenCalled(); + }); + + test('should not create webhook if it already exists', async () => { + defaultWebhookMethods.checkExists.mockResolvedValue(true); + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + + await webhookService.createWebhookIfNotExists(workflow, webhookData, 'trigger', 'init'); + + expect(defaultWebhookMethods.checkExists).toHaveBeenCalled(); + expect(defaultWebhookMethods.create).not.toHaveBeenCalled(); + }); + + test('should handle case when webhook methods are not defined', async () => { + nodeTypes.getByNameAndVersion.mockReturnValue({} as INodeType); + + await webhookService.createWebhookIfNotExists(workflow, webhookData, 'trigger', 'init'); + // Test passes if no error is thrown when webhook methods are undefined + }); + }); + + describe('deleteWebhook()', () => { + test('should call runWebhookMethod with delete', async () => { + const workflow = mock(); + const webhookData = mock(); + const runWebhookMethodSpy = jest.spyOn(webhookService as any, 'runWebhookMethod'); + + await webhookService.deleteWebhook(workflow, webhookData, 'trigger', 'init'); + + expect(runWebhookMethodSpy).toHaveBeenCalledWith( + 'delete', + workflow, + webhookData, + 'trigger', + 'init', + ); + }); + }); + + describe('runWebhook()', () => { + const workflow = mock(); + const webhookData = mock(); + const node = mock(); + const responseData = { workflowData: [] }; + + test('should throw error if node does not have webhooks', async () => { + const nodeType = {} as INodeType; + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + + await expect( + webhookService.runWebhook(workflow, webhookData, node, additionalData, 'trigger', null), + ).rejects.toThrow('Node does not have any webhooks defined'); + }); + + test('should execute webhook and return response data', async () => { + const nodeType = mock({ + webhook: jest.fn().mockResolvedValue(responseData), + }); + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + + const result = await webhookService.runWebhook( + workflow, + webhookData, + node, + additionalData, + 'trigger', + null, + ); + + expect(result).toEqual(responseData); + expect(nodeType.webhook).toHaveBeenCalled(); + }); + }); }); diff --git a/packages/cli/src/webhooks/live-webhooks.ts b/packages/cli/src/webhooks/live-webhooks.ts index 458701caee..6d6fc9161d 100644 --- a/packages/cli/src/webhooks/live-webhooks.ts +++ b/packages/cli/src/webhooks/live-webhooks.ts @@ -1,5 +1,5 @@ import type { Response } from 'express'; -import { Workflow, NodeHelpers, CHAT_TRIGGER_NODE_TYPE } from 'n8n-workflow'; +import { Workflow, CHAT_TRIGGER_NODE_TYPE } from 'n8n-workflow'; import type { INode, IWebhookData, IHttpRequestMethods } from 'n8n-workflow'; import { Service } from 'typedi'; @@ -114,11 +114,9 @@ export class LiveWebhooks implements IWebhookManager { const additionalData = await WorkflowExecuteAdditionalData.getBase(); - const webhookData = NodeHelpers.getNodeWebhooks( - workflow, - workflow.getNode(webhook.node) as INode, - additionalData, - ).find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData; + const webhookData = this.webhookService + .getNodeWebhooks(workflow, workflow.getNode(webhook.node) as INode, additionalData) + .find((w) => w.httpMethod === httpMethod && w.path === webhook.webhookPath) as IWebhookData; // Get the node which has the webhook defined to know where to start from and to // get additional data diff --git a/packages/cli/src/webhooks/test-webhooks.ts b/packages/cli/src/webhooks/test-webhooks.ts index 8c742946c5..ad642a17c3 100644 --- a/packages/cli/src/webhooks/test-webhooks.ts +++ b/packages/cli/src/webhooks/test-webhooks.ts @@ -1,5 +1,4 @@ import type express from 'express'; -import * as NodeExecuteFunctions from 'n8n-core'; import { InstanceSettings } from 'n8n-core'; import { WebhookPathTakenError, Workflow } from 'n8n-workflow'; import type { @@ -25,6 +24,7 @@ import * as WebhookHelpers from '@/webhooks/webhook-helpers'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; import type { WorkflowRequest } from '@/workflows/workflow.request'; +import { WebhookService } from './webhook.service'; import type { IWebhookResponseCallbackData, IWebhookManager, @@ -44,6 +44,7 @@ export class TestWebhooks implements IWebhookManager { private readonly registrations: TestWebhookRegistrationsService, private readonly instanceSettings: InstanceSettings, private readonly publisher: Publisher, + private readonly webhookService: WebhookService, ) {} private timeouts: { [webhookKey: string]: NodeJS.Timeout } = {}; @@ -314,7 +315,7 @@ export class TestWebhooks implements IWebhookManager { */ await this.registrations.register(registration); - await workflow.createWebhookIfNotExists(webhook, NodeExecuteFunctions, 'manual', 'manual'); + await this.webhookService.createWebhookIfNotExists(workflow, webhook, 'manual', 'manual'); cacheableWebhook.staticData = workflow.staticData; @@ -431,7 +432,7 @@ export class TestWebhooks implements IWebhookManager { if (staticData) workflow.staticData = staticData; - await workflow.deleteWebhook(webhook, NodeExecuteFunctions, 'internal', 'update'); + await this.webhookService.deleteWebhook(workflow, webhook, 'internal', 'update'); } await this.registrations.deregisterAll(); diff --git a/packages/cli/src/webhooks/waiting-webhooks.ts b/packages/cli/src/webhooks/waiting-webhooks.ts index 3176bbdf2d..6355709189 100644 --- a/packages/cli/src/webhooks/waiting-webhooks.ts +++ b/packages/cli/src/webhooks/waiting-webhooks.ts @@ -3,7 +3,6 @@ import { FORM_NODE_TYPE, type INodes, type IWorkflowBase, - NodeHelpers, SEND_AND_WAIT_OPERATION, WAIT_NODE_TYPE, Workflow, @@ -19,6 +18,7 @@ import { NodeTypes } from '@/node-types'; import * as WebhookHelpers from '@/webhooks/webhook-helpers'; import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-data'; +import { WebhookService } from './webhook.service'; import type { IWebhookResponseCallbackData, IWebhookManager, @@ -38,6 +38,7 @@ export class WaitingWebhooks implements IWebhookManager { protected readonly logger: Logger, protected readonly nodeTypes: NodeTypes, private readonly executionRepository: ExecutionRepository, + private readonly webhookService: WebhookService, ) {} // TODO: implement `getWebhookMethods` for CORS support @@ -164,17 +165,15 @@ export class WaitingWebhooks implements IWebhookManager { } const additionalData = await WorkflowExecuteAdditionalData.getBase(); - const webhookData = NodeHelpers.getNodeWebhooks( - workflow, - workflowStartNode, - additionalData, - ).find( - (webhook) => - webhook.httpMethod === req.method && - webhook.path === (suffix ?? '') && - webhook.webhookDescription.restartWebhook === true && - (webhook.webhookDescription.isForm || false) === this.includeForms, - ); + const webhookData = this.webhookService + .getNodeWebhooks(workflow, workflowStartNode, additionalData) + .find( + (webhook) => + webhook.httpMethod === req.method && + webhook.path === (suffix ?? '') && + webhook.webhookDescription.restartWebhook === true && + (webhook.webhookDescription.isForm || false) === this.includeForms, + ); if (webhookData === undefined) { // If no data got found it means that the execution can not be started via a webhook. diff --git a/packages/cli/src/webhooks/webhook-helpers.ts b/packages/cli/src/webhooks/webhook-helpers.ts index bc38f63fb4..6657089881 100644 --- a/packages/cli/src/webhooks/webhook-helpers.ts +++ b/packages/cli/src/webhooks/webhook-helpers.ts @@ -9,7 +9,7 @@ import { GlobalConfig } from '@n8n/config'; import type express from 'express'; import get from 'lodash/get'; -import { BinaryDataService, ErrorReporter, NodeExecuteFunctions } from 'n8n-core'; +import { BinaryDataService, ErrorReporter } from 'n8n-core'; import type { IBinaryData, IBinaryKeyData, @@ -35,7 +35,6 @@ import { createDeferredPromise, ExecutionCancelledError, FORM_NODE_TYPE, - NodeHelpers, NodeOperationError, } from 'n8n-workflow'; import { finished } from 'stream/promises'; @@ -57,6 +56,7 @@ import * as WorkflowExecuteAdditionalData from '@/workflow-execute-additional-da import * as WorkflowHelpers from '@/workflow-helpers'; import { WorkflowRunner } from '@/workflow-runner'; +import { WebhookService } from './webhook.service'; import type { IWebhookResponseCallbackData, WebhookRequest } from './webhook.types'; /** @@ -88,7 +88,12 @@ export function getWorkflowWebhooks( } returnData.push.apply( returnData, - NodeHelpers.getNodeWebhooks(workflow, node, additionalData, ignoreRestartWebhooks), + Container.get(WebhookService).getNodeWebhooks( + workflow, + node, + additionalData, + ignoreRestartWebhooks, + ), ); } @@ -254,11 +259,11 @@ export async function executeWebhook( } try { - webhookResultData = await workflow.runWebhook( + webhookResultData = await Container.get(WebhookService).runWebhook( + workflow, webhookData, workflowStartNode, additionalData, - NodeExecuteFunctions, executionMode, runExecutionData ?? null, ); diff --git a/packages/cli/src/webhooks/webhook.service.ts b/packages/cli/src/webhooks/webhook.service.ts index 8e72f0abf9..80b12b04cd 100644 --- a/packages/cli/src/webhooks/webhook.service.ts +++ b/packages/cli/src/webhooks/webhook.service.ts @@ -1,8 +1,23 @@ -import type { IHttpRequestMethods } from 'n8n-workflow'; +import { HookContext, WebhookContext } from 'n8n-core'; +import { ApplicationError, Node, NodeHelpers } from 'n8n-workflow'; +import type { + IHttpRequestMethods, + INode, + IRunExecutionData, + IWebhookData, + IWebhookResponseData, + IWorkflowExecuteAdditionalData, + WebhookSetupMethodNames, + Workflow, + WorkflowActivateMode, + WorkflowExecuteMode, +} from 'n8n-workflow'; import { Service } from 'typedi'; import type { WebhookEntity } from '@/databases/entities/webhook-entity'; import { WebhookRepository } from '@/databases/repositories/webhook.repository'; +import { Logger } from '@/logging/logger.service'; +import { NodeTypes } from '@/node-types'; import { CacheService } from '@/services/cache/cache.service'; type Method = NonNullable; @@ -10,8 +25,10 @@ type Method = NonNullable; @Service() export class WebhookService { constructor( - private webhookRepository: WebhookRepository, - private cacheService: CacheService, + private readonly logger: Logger, + private readonly webhookRepository: WebhookRepository, + private readonly cacheService: CacheService, + private readonly nodeTypes: NodeTypes, ) {} async populateCache() { @@ -118,4 +135,210 @@ export class WebhookService { .find({ select: ['method'], where: { webhookPath: path } }) .then((rows) => rows.map((r) => r.method)); } + + /** + * Returns all the webhooks which should be created for the give node + */ + getNodeWebhooks( + workflow: Workflow, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + ignoreRestartWebhooks = false, + ): IWebhookData[] { + if (node.disabled === true) { + // Node is disabled so webhooks will also not be enabled + return []; + } + + const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + + if (nodeType.description.webhooks === undefined) { + // Node does not have any webhooks so return + return []; + } + + const workflowId = workflow.id || '__UNSAVED__'; + const mode = 'internal'; + + const returnData: IWebhookData[] = []; + for (const webhookDescription of nodeType.description.webhooks) { + if (ignoreRestartWebhooks && webhookDescription.restartWebhook === true) { + continue; + } + + let nodeWebhookPath = workflow.expression.getSimpleParameterValue( + node, + webhookDescription.path, + mode, + {}, + ); + if (nodeWebhookPath === undefined) { + this.logger.error( + `No webhook path could be found for node "${node.name}" in workflow "${workflowId}".`, + ); + continue; + } + + nodeWebhookPath = nodeWebhookPath.toString(); + + if (nodeWebhookPath.startsWith('/')) { + nodeWebhookPath = nodeWebhookPath.slice(1); + } + if (nodeWebhookPath.endsWith('/')) { + nodeWebhookPath = nodeWebhookPath.slice(0, -1); + } + + const isFullPath: boolean = workflow.expression.getSimpleParameterValue( + node, + webhookDescription.isFullPath, + 'internal', + {}, + undefined, + false, + ) as boolean; + const restartWebhook: boolean = workflow.expression.getSimpleParameterValue( + node, + webhookDescription.restartWebhook, + 'internal', + {}, + undefined, + false, + ) as boolean; + const path = NodeHelpers.getNodeWebhookPath( + workflowId, + node, + nodeWebhookPath, + isFullPath, + restartWebhook, + ); + + const webhookMethods = workflow.expression.getSimpleParameterValue( + node, + webhookDescription.httpMethod, + mode, + {}, + undefined, + 'GET', + ); + + if (webhookMethods === undefined) { + this.logger.error( + `The webhook "${path}" for node "${node.name}" in workflow "${workflowId}" could not be added because the httpMethod is not defined.`, + ); + continue; + } + + let webhookId: string | undefined; + if ((path.startsWith(':') || path.includes('/:')) && node.webhookId) { + webhookId = node.webhookId; + } + + String(webhookMethods) + .split(',') + .forEach((httpMethod) => { + if (!httpMethod) return; + returnData.push({ + httpMethod: httpMethod.trim() as IHttpRequestMethods, + node: node.name, + path, + webhookDescription, + workflowId, + workflowExecuteAdditionalData: additionalData, + webhookId, + }); + }); + } + + return returnData; + } + + async createWebhookIfNotExists( + workflow: Workflow, + webhookData: IWebhookData, + mode: WorkflowExecuteMode, + activation: WorkflowActivateMode, + ): Promise { + const webhookExists = await this.runWebhookMethod( + 'checkExists', + workflow, + webhookData, + mode, + activation, + ); + if (!webhookExists) { + // If webhook does not exist yet create it + await this.runWebhookMethod('create', workflow, webhookData, mode, activation); + } + } + + async deleteWebhook( + workflow: Workflow, + webhookData: IWebhookData, + mode: WorkflowExecuteMode, + activation: WorkflowActivateMode, + ) { + await this.runWebhookMethod('delete', workflow, webhookData, mode, activation); + } + + private async runWebhookMethod( + method: WebhookSetupMethodNames, + workflow: Workflow, + webhookData: IWebhookData, + mode: WorkflowExecuteMode, + activation: WorkflowActivateMode, + ): Promise { + const node = workflow.getNode(webhookData.node); + + if (!node) return; + + const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + + const webhookFn = nodeType.webhookMethods?.[webhookData.webhookDescription.name]?.[method]; + if (webhookFn === undefined) return; + + const context = new HookContext( + workflow, + node, + webhookData.workflowExecuteAdditionalData, + mode, + activation, + webhookData, + ); + + return (await webhookFn.call(context)) as boolean; + } + + /** + * Executes the webhook data to see what it should return and if the + * workflow should be started or not + */ + async runWebhook( + workflow: Workflow, + webhookData: IWebhookData, + node: INode, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + runExecutionData: IRunExecutionData | null, + ): Promise { + const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + if (nodeType.webhook === undefined) { + throw new ApplicationError('Node does not have any webhooks defined', { + extra: { nodeName: node.name }, + }); + } + + const context = new WebhookContext( + workflow, + node, + additionalData, + mode, + webhookData, + [], + runExecutionData ?? null, + ); + + return nodeType instanceof Node + ? await nodeType.webhook(context) + : ((await nodeType.webhook.call(context)) as IWebhookResponseData); + } } diff --git a/packages/cli/test/integration/active-workflow-manager.test.ts b/packages/cli/test/integration/active-workflow-manager.test.ts index d61a97ade7..a3e4f657f2 100644 --- a/packages/cli/test/integration/active-workflow-manager.test.ts +++ b/packages/cli/test/integration/active-workflow-manager.test.ts @@ -1,5 +1,4 @@ import { mock } from 'jest-mock-extended'; -import type { InstanceSettings } from 'n8n-core'; import { NodeApiError, Workflow } from 'n8n-workflow'; import type { IWebhookData, WorkflowActivateMode } from 'n8n-workflow'; import { Container } from 'typedi'; @@ -10,6 +9,7 @@ import type { WebhookEntity } from '@/databases/entities/webhook-entity'; import type { WorkflowEntity } from '@/databases/entities/workflow-entity'; import { ExecutionService } from '@/executions/execution.service'; import { ExternalHooks } from '@/external-hooks'; +import { Logger } from '@/logging/logger.service'; import { NodeTypes } from '@/node-types'; import { Push } from '@/push'; import { SecretsHelper } from '@/secrets-helpers'; @@ -25,6 +25,7 @@ import * as utils from './shared/utils/'; import { mockInstance } from '../shared/mocking'; mockInstance(ActiveExecutions); +mockInstance(Logger); mockInstance(Push); mockInstance(SecretsHelper); mockInstance(ExecutionService); @@ -85,7 +86,7 @@ describe('init()', () => { await Promise.all([createActiveWorkflow(), createActiveWorkflow()]); const checkSpy = jest - .spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated') + .spyOn(activeWorkflowManager, 'checkIfWorkflowCanBeActivated') .mockReturnValue(true); await activeWorkflowManager.init(); @@ -166,7 +167,6 @@ describe('remove()', () => { it('should remove all webhooks of a workflow from external service', async () => { const dbWorkflow = await createActiveWorkflow(); - const deleteWebhookSpy = jest.spyOn(Workflow.prototype, 'deleteWebhook'); jest .spyOn(WebhookHelpers, 'getWorkflowWebhooks') .mockReturnValue([mock({ path: 'some-path' })]); @@ -174,7 +174,7 @@ describe('remove()', () => { await activeWorkflowManager.init(); await activeWorkflowManager.remove(dbWorkflow.id); - expect(deleteWebhookSpy).toHaveBeenCalledTimes(1); + expect(webhookService.deleteWebhook).toHaveBeenCalledTimes(1); }); it('should stop running triggers and pollers', async () => { @@ -258,82 +258,11 @@ describe('addWebhooks()', () => { const [node] = dbWorkflow.nodes; jest.spyOn(Workflow.prototype, 'getNode').mockReturnValue(node); - jest.spyOn(Workflow.prototype, 'checkIfWorkflowCanBeActivated').mockReturnValue(true); - jest.spyOn(Workflow.prototype, 'createWebhookIfNotExists').mockResolvedValue(undefined); + jest.spyOn(activeWorkflowManager, 'checkIfWorkflowCanBeActivated').mockReturnValue(true); + webhookService.createWebhookIfNotExists.mockResolvedValue(undefined); await activeWorkflowManager.addWebhooks(workflow, additionalData, 'trigger', 'init'); expect(webhookService.storeWebhook).toHaveBeenCalledTimes(1); }); }); - -describe('shouldAddWebhooks', () => { - describe('if leader', () => { - const activeWorkflowManager = new ActiveWorkflowManager( - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock({ isLeader: true, isFollower: false }), - mock(), - ); - - test('should return `true` for `init`', () => { - // ensure webhooks are populated on init: https://github.com/n8n-io/n8n/pull/8830 - const result = activeWorkflowManager.shouldAddWebhooks('init'); - expect(result).toBe(true); - }); - - test('should return `false` for `leadershipChange`', () => { - const result = activeWorkflowManager.shouldAddWebhooks('leadershipChange'); - expect(result).toBe(false); - }); - - test('should return `true` for `update` or `activate`', () => { - const modes = ['update', 'activate'] as WorkflowActivateMode[]; - for (const mode of modes) { - const result = activeWorkflowManager.shouldAddWebhooks(mode); - expect(result).toBe(true); - } - }); - }); - - describe('if follower', () => { - const activeWorkflowManager = new ActiveWorkflowManager( - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock(), - mock({ isLeader: false, isFollower: true }), - mock(), - ); - - test('should return `false` for `update` or `activate`', () => { - const modes = ['update', 'activate'] as WorkflowActivateMode[]; - for (const mode of modes) { - const result = activeWorkflowManager.shouldAddWebhooks(mode); - expect(result).toBe(false); - } - }); - }); -}); diff --git a/packages/core/src/ActiveWorkflows.ts b/packages/core/src/ActiveWorkflows.ts index de919b0649..b7604f9778 100644 --- a/packages/core/src/ActiveWorkflows.ts +++ b/packages/core/src/ActiveWorkflows.ts @@ -22,11 +22,13 @@ import { Service } from 'typedi'; import { ErrorReporter } from './error-reporter'; import type { IWorkflowData } from './Interfaces'; import { ScheduledTaskManager } from './ScheduledTaskManager'; +import { TriggersAndPollers } from './TriggersAndPollers'; @Service() export class ActiveWorkflows { constructor( private readonly scheduledTaskManager: ScheduledTaskManager, + private readonly triggersAndPollers: TriggersAndPollers, private readonly errorReporter: ErrorReporter, ) {} @@ -78,7 +80,8 @@ export class ActiveWorkflows { for (const triggerNode of triggerNodes) { try { - triggerResponse = await workflow.runTrigger( + triggerResponse = await this.triggersAndPollers.runTrigger( + workflow, triggerNode, getTriggerFunctions, additionalData, @@ -153,7 +156,7 @@ export class ActiveWorkflows { }); try { - const pollResponse = await workflow.runPoll(node, pollFunctions); + const pollResponse = await this.triggersAndPollers.runPoll(workflow, node, pollFunctions); if (pollResponse !== null) { pollFunctions.__emit(pollResponse); diff --git a/packages/core/src/Constants.ts b/packages/core/src/Constants.ts index d5fc12bffd..ceaf77566a 100644 --- a/packages/core/src/Constants.ts +++ b/packages/core/src/Constants.ts @@ -1,3 +1,6 @@ +import type { INodeProperties } from 'n8n-workflow'; +import { cronNodeOptions } from 'n8n-workflow'; + export const CUSTOM_EXTENSION_ENV = 'N8N_CUSTOM_EXTENSIONS'; export const PLACEHOLDER_EMPTY_EXECUTION_ID = '__UNKNOWN__'; export const PLACEHOLDER_EMPTY_WORKFLOW_ID = '__EMPTY__'; @@ -12,3 +15,30 @@ export const CONFIG_FILES = 'N8N_CONFIG_FILES'; export const BINARY_DATA_STORAGE_PATH = 'N8N_BINARY_DATA_STORAGE_PATH'; export const UM_EMAIL_TEMPLATES_INVITE = 'N8N_UM_EMAIL_TEMPLATES_INVITE'; export const UM_EMAIL_TEMPLATES_PWRESET = 'N8N_UM_EMAIL_TEMPLATES_PWRESET'; + +export const commonPollingParameters: INodeProperties[] = [ + { + displayName: 'Poll Times', + name: 'pollTimes', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + multipleValueButtonText: 'Add Poll Time', + }, + default: { item: [{ mode: 'everyMinute' }] }, + description: 'Time at which polling should occur', + placeholder: 'Add Poll Time', + options: cronNodeOptions, + }, +]; + +export const commonCORSParameters: INodeProperties[] = [ + { + displayName: 'Allowed Origins (CORS)', + name: 'allowedOrigins', + type: 'string', + default: '*', + description: + 'Comma-separated list of URLs allowed for cross-origin non-preflight requests. Use * (default) to allow all origins.', + }, +]; diff --git a/packages/core/src/DirectoryLoader.ts b/packages/core/src/DirectoryLoader.ts index d73fd16080..cd223da2dd 100644 --- a/packages/core/src/DirectoryLoader.ts +++ b/packages/core/src/DirectoryLoader.ts @@ -6,6 +6,7 @@ import type { ICredentialType, ICredentialTypeData, INodeCredentialDescription, + INodePropertyOptions, INodeType, INodeTypeBaseDescription, INodeTypeData, @@ -14,13 +15,18 @@ import type { IVersionedNodeType, KnownNodesAndCredentials, } from 'n8n-workflow'; -import { ApplicationError, LoggerProxy as Logger, NodeHelpers, jsonParse } from 'n8n-workflow'; +import { + ApplicationError, + LoggerProxy as Logger, + applyDeclarativeNodeOptionParameters, + jsonParse, +} from 'n8n-workflow'; import { readFileSync } from 'node:fs'; import { readFile } from 'node:fs/promises'; import * as path from 'path'; import { loadClassInIsolation } from './ClassLoader'; -import { CUSTOM_NODES_CATEGORY } from './Constants'; +import { commonCORSParameters, commonPollingParameters, CUSTOM_NODES_CATEGORY } from './Constants'; import { UnrecognizedCredentialTypeError } from './errors/unrecognized-credential-type.error'; import { UnrecognizedNodeTypeError } from './errors/unrecognized-node-type.error'; import type { n8n } from './Interfaces'; @@ -135,7 +141,7 @@ export abstract class DirectoryLoader { for (const version of Object.values(tempNode.nodeVersions)) { this.addLoadOptionsMethods(version); - NodeHelpers.applySpecialNodeParameters(version); + this.applySpecialNodeParameters(version); } const currentVersionNode = tempNode.nodeVersions[tempNode.currentVersion]; @@ -150,7 +156,7 @@ export abstract class DirectoryLoader { } } else { this.addLoadOptionsMethods(tempNode); - NodeHelpers.applySpecialNodeParameters(tempNode); + this.applySpecialNodeParameters(tempNode); // Short renaming to avoid type issues nodeVersion = Array.isArray(tempNode.description.version) @@ -346,6 +352,24 @@ export abstract class DirectoryLoader { } } + private applySpecialNodeParameters(nodeType: INodeType): void { + const { properties, polling, supportsCORS } = nodeType.description; + if (polling) { + properties.unshift(...commonPollingParameters); + } + if (nodeType.webhook && supportsCORS) { + const optionsProperty = properties.find(({ name }) => name === 'options'); + if (optionsProperty) + optionsProperty.options = [ + ...commonCORSParameters, + ...(optionsProperty.options as INodePropertyOptions[]), + ]; + else properties.push(...commonCORSParameters); + } + + applyDeclarativeNodeOptionParameters(nodeType); + } + private getIconPath(icon: string, filePath: string) { const iconPath = path.join(path.dirname(filePath), icon.replace('file:', '')); return `icons/${this.packageName}/${iconPath}`; diff --git a/packages/core/src/NodeExecuteFunctions.ts b/packages/core/src/NodeExecuteFunctions.ts index 4967617c9b..047c4e23ad 100644 --- a/packages/core/src/NodeExecuteFunctions.ts +++ b/packages/core/src/NodeExecuteFunctions.ts @@ -41,8 +41,6 @@ import type { IDataObject, IExecuteData, IExecuteFunctions, - IExecuteSingleFunctions, - IHookFunctions, IHttpRequestOptions, IN8nHttpFullResponse, IN8nHttpResponse, @@ -56,9 +54,7 @@ import type { IRunExecutionData, ITaskDataConnections, ITriggerFunctions, - IWebhookData, IWebhookDescription, - IWebhookFunctions, IWorkflowDataProxyAdditionalKeys, IWorkflowExecuteAdditionalData, NodeExecutionWithMetadata, @@ -121,15 +117,7 @@ import { DataDeduplicationService } from './data-deduplication-service'; import { InstanceSettings } from './InstanceSettings'; import type { IResponseError } from './Interfaces'; // eslint-disable-next-line import/no-cycle -import { - ExecuteContext, - ExecuteSingleContext, - HookContext, - PollContext, - SupplyDataContext, - TriggerContext, - WebhookContext, -} from './node-execution-context'; +import { PollContext, SupplyDataContext, TriggerContext } from './node-execution-context'; import { ScheduledTaskManager } from './ScheduledTaskManager'; import { SSHClientsManager } from './SSHClientsManager'; @@ -2720,68 +2708,6 @@ export function getExecuteTriggerFunctions( return new TriggerContext(workflow, node, additionalData, mode, activation); } -/** - * Returns the execute functions regular nodes have access to. - */ -export function getExecuteFunctions( - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - connectionInputData: INodeExecutionData[], - inputData: ITaskDataConnections, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - executeData: IExecuteData, - mode: WorkflowExecuteMode, - closeFunctions: CloseFunction[], - abortSignal?: AbortSignal, -): IExecuteFunctions { - return new ExecuteContext( - workflow, - node, - additionalData, - mode, - runExecutionData, - runIndex, - connectionInputData, - inputData, - executeData, - closeFunctions, - abortSignal, - ); -} - -/** - * Returns the execute functions regular nodes have access to when single-function is defined. - */ -export function getExecuteSingleFunctions( - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - connectionInputData: INodeExecutionData[], - inputData: ITaskDataConnections, - node: INode, - itemIndex: number, - additionalData: IWorkflowExecuteAdditionalData, - executeData: IExecuteData, - mode: WorkflowExecuteMode, - abortSignal?: AbortSignal, -): IExecuteSingleFunctions { - return new ExecuteSingleContext( - workflow, - node, - additionalData, - mode, - runExecutionData, - runIndex, - connectionInputData, - inputData, - itemIndex, - executeData, - abortSignal, - ); -} - export function getCredentialTestFunctions(): ICredentialTestFunctions { return { helpers: { @@ -2792,41 +2718,3 @@ export function getCredentialTestFunctions(): ICredentialTestFunctions { }, }; } - -/** - * Returns the execute functions regular nodes have access to in hook-function. - */ -export function getExecuteHookFunctions( - workflow: Workflow, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - mode: WorkflowExecuteMode, - activation: WorkflowActivateMode, - webhookData?: IWebhookData, -): IHookFunctions { - return new HookContext(workflow, node, additionalData, mode, activation, webhookData); -} - -/** - * Returns the execute functions regular nodes have access to when webhook-function is defined. - */ -// TODO: check where it is used and make sure close functions are called -export function getExecuteWebhookFunctions( - workflow: Workflow, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - mode: WorkflowExecuteMode, - webhookData: IWebhookData, - closeFunctions: CloseFunction[], - runExecutionData: IRunExecutionData | null, -): IWebhookFunctions { - return new WebhookContext( - workflow, - node, - additionalData, - mode, - webhookData, - closeFunctions, - runExecutionData, - ); -} diff --git a/packages/workflow/src/RoutingNode.ts b/packages/core/src/RoutingNode.ts similarity index 97% rename from packages/workflow/src/RoutingNode.ts rename to packages/core/src/RoutingNode.ts index db3b180972..1c041735d8 100644 --- a/packages/workflow/src/RoutingNode.ts +++ b/packages/core/src/RoutingNode.ts @@ -1,25 +1,18 @@ /* eslint-disable @typescript-eslint/no-unsafe-call */ /* eslint-disable @typescript-eslint/no-unsafe-argument */ - /* eslint-disable @typescript-eslint/prefer-nullish-coalescing */ - /* eslint-disable @typescript-eslint/no-unsafe-member-access */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ - import get from 'lodash/get'; import merge from 'lodash/merge'; import set from 'lodash/set'; -import url from 'node:url'; - -import { NodeApiError } from './errors/node-api.error'; -import { NodeOperationError } from './errors/node-operation.error'; +import { NodeHelpers, NodeApiError, NodeOperationError, sleep } from 'n8n-workflow'; import type { ICredentialDataDecryptedObject, ICredentialsDecrypted, IHttpRequestOptions, IN8nHttpFullResponse, INode, - INodeExecuteFunctions, INodeExecutionData, INodeParameters, INodePropertyOptions, @@ -43,10 +36,11 @@ import type { CloseFunction, INodeCredentialDescription, IExecutePaginationFunctions, -} from './Interfaces'; -import * as NodeHelpers from './NodeHelpers'; -import { sleep } from './utils'; -import type { Workflow } from './Workflow'; + Workflow, +} from 'n8n-workflow'; +import url from 'node:url'; + +import { ExecuteContext, ExecuteSingleContext } from './node-execution-context'; export class RoutingNode { additionalData: IWorkflowExecuteAdditionalData; @@ -83,7 +77,6 @@ export class RoutingNode { runIndex: number, nodeType: INodeType, executeData: IExecuteData, - nodeExecuteFunctions: INodeExecuteFunctions, credentialsDecrypted?: ICredentialsDecrypted, abortSignal?: AbortSignal, ): Promise { @@ -91,16 +84,16 @@ export class RoutingNode { const returnData: INodeExecutionData[] = []; const closeFunctions: CloseFunction[] = []; - const executeFunctions = nodeExecuteFunctions.getExecuteFunctions( + const executeFunctions = new ExecuteContext( this.workflow, + this.node, + this.additionalData, + this.mode, this.runExecutionData, runIndex, this.connectionInputData, inputData, - this.node, - this.additionalData, executeData, - this.mode, closeFunctions, abortSignal, ); @@ -136,6 +129,7 @@ export class RoutingNode { credentials = (await executeFunctions.getCredentials( credentialDescription.name, + 0, )) || {}; } catch (error) { if (credentialDescription.required) { @@ -168,20 +162,22 @@ export class RoutingNode { } } + const thisArgs = new ExecuteSingleContext( + this.workflow, + this.node, + this.additionalData, + this.mode, + this.runExecutionData, + runIndex, + this.connectionInputData, + inputData, + itemIndex, + executeData, + abortSignal, + ); + itemContext.push({ - thisArgs: nodeExecuteFunctions.getExecuteSingleFunctions( - this.workflow, - this.runExecutionData, - runIndex, - this.connectionInputData, - inputData, - this.node, - itemIndex, - this.additionalData, - executeData, - this.mode, - abortSignal, - ), + thisArgs, requestData: { options: { qs: {}, @@ -308,6 +304,7 @@ export class RoutingNode { } const promisesResponses = await Promise.allSettled(requestPromises); + // eslint-disable-next-line @typescript-eslint/no-explicit-any let responseData: any; for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { responseData = promisesResponses.shift(); diff --git a/packages/core/src/TriggersAndPollers.ts b/packages/core/src/TriggersAndPollers.ts new file mode 100644 index 0000000000..b77926e136 --- /dev/null +++ b/packages/core/src/TriggersAndPollers.ts @@ -0,0 +1,116 @@ +import { ApplicationError } from 'n8n-workflow'; +import type { + Workflow, + INode, + INodeExecutionData, + IPollFunctions, + IGetExecuteTriggerFunctions, + IWorkflowExecuteAdditionalData, + WorkflowExecuteMode, + WorkflowActivateMode, + ITriggerResponse, + IDeferredPromise, + IExecuteResponsePromiseData, + IRun, +} from 'n8n-workflow'; +import { Service } from 'typedi'; + +@Service() +export class TriggersAndPollers { + /** + * Runs the given trigger node so that it can trigger the workflow when the node has data. + */ + async runTrigger( + workflow: Workflow, + node: INode, + getTriggerFunctions: IGetExecuteTriggerFunctions, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + activation: WorkflowActivateMode, + ): Promise { + const triggerFunctions = getTriggerFunctions(workflow, node, additionalData, mode, activation); + + const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + + if (!nodeType.trigger) { + throw new ApplicationError('Node type does not have a trigger function defined', { + extra: { nodeName: node.name }, + tags: { nodeType: node.type }, + }); + } + + if (mode === 'manual') { + // In manual mode we do not just start the trigger function we also + // want to be able to get informed as soon as the first data got emitted + const triggerResponse = await nodeType.trigger.call(triggerFunctions); + + // Add the manual trigger response which resolves when the first time data got emitted + triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => { + triggerFunctions.emit = ( + (resolveEmit) => + ( + data: INodeExecutionData[][], + responsePromise?: IDeferredPromise, + donePromise?: IDeferredPromise, + ) => { + additionalData.hooks!.hookFunctions.sendResponse = [ + async (response: IExecuteResponsePromiseData): Promise => { + if (responsePromise) { + responsePromise.resolve(response); + } + }, + ]; + + if (donePromise) { + additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift( + async (runData: IRun): Promise => { + return donePromise.resolve(runData); + }, + ); + } + + resolveEmit(data); + } + )(resolve); + triggerFunctions.emitError = ( + (rejectEmit) => + (error: Error, responsePromise?: IDeferredPromise) => { + additionalData.hooks!.hookFunctions.sendResponse = [ + async (): Promise => { + if (responsePromise) { + responsePromise.reject(error); + } + }, + ]; + + rejectEmit(error); + } + )(reject); + }); + + return triggerResponse; + } + // In all other modes simply start the trigger + return await nodeType.trigger.call(triggerFunctions); + } + + /** + * Runs the given poller node so that it can trigger the workflow when the node has data. + */ + async runPoll( + workflow: Workflow, + node: INode, + pollFunctions: IPollFunctions, + ): Promise { + const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + + if (!nodeType.poll) { + throw new ApplicationError('Node type does not have a poll function defined', { + extra: { nodeName: node.name }, + tags: { nodeType: node.type }, + }); + } + + return await nodeType.poll.call(pollFunctions); + } +} diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index 215119922e..ba855c0b57 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -37,6 +37,9 @@ import type { StartNodeData, NodeExecutionHint, NodeInputConnections, + IRunNodeResponse, + IWorkflowIssues, + INodeIssues, } from 'n8n-workflow'; import { LoggerProxy as Logger, @@ -47,11 +50,13 @@ import { NodeExecutionOutput, sleep, ExecutionCancelledError, + Node, } from 'n8n-workflow'; import PCancelable from 'p-cancelable'; import Container from 'typedi'; import { ErrorReporter } from './error-reporter'; +import { ExecuteContext, PollContext } from './node-execution-context'; import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import { DirectedGraph, @@ -63,6 +68,8 @@ import { handleCycles, filterDisabledNodes, } from './PartialExecutionUtils'; +import { RoutingNode } from './RoutingNode'; +import { TriggersAndPollers } from './TriggersAndPollers'; export class WorkflowExecute { private status: ExecutionStatus = 'new'; @@ -884,6 +891,280 @@ export class WorkflowExecute { } } + /** + * Checks if everything in the workflow is complete + * and ready to be executed. If it returns null everything + * is fine. If there are issues it returns the issues + * which have been found for the different nodes. + * TODO: Does currently not check for credential issues! + */ + checkReadyForExecution( + workflow: Workflow, + inputData: { + startNode?: string; + destinationNode?: string; + pinDataNodeNames?: string[]; + } = {}, + ): IWorkflowIssues | null { + const workflowIssues: IWorkflowIssues = {}; + + let checkNodes: string[] = []; + if (inputData.destinationNode) { + // If a destination node is given we have to check all the nodes + // leading up to it + checkNodes = workflow.getParentNodes(inputData.destinationNode); + checkNodes.push(inputData.destinationNode); + } else if (inputData.startNode) { + // If a start node is given we have to check all nodes which + // come after it + checkNodes = workflow.getChildNodes(inputData.startNode); + checkNodes.push(inputData.startNode); + } + + for (const nodeName of checkNodes) { + let nodeIssues: INodeIssues | null = null; + const node = workflow.nodes[nodeName]; + + if (node.disabled === true) { + continue; + } + + const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + + if (nodeType === undefined) { + // Node type is not known + nodeIssues = { + typeUnknown: true, + }; + } else { + nodeIssues = NodeHelpers.getNodeParametersIssues( + nodeType.description.properties, + node, + inputData.pinDataNodeNames, + ); + } + + if (nodeIssues !== null) { + workflowIssues[node.name] = nodeIssues; + } + } + + if (Object.keys(workflowIssues).length === 0) { + return null; + } + + return workflowIssues; + } + + /** Executes the given node */ + // eslint-disable-next-line complexity + async runNode( + workflow: Workflow, + executionData: IExecuteData, + runExecutionData: IRunExecutionData, + runIndex: number, + additionalData: IWorkflowExecuteAdditionalData, + mode: WorkflowExecuteMode, + abortSignal?: AbortSignal, + ): Promise { + const { node } = executionData; + let inputData = executionData.data; + + if (node.disabled === true) { + // If node is disabled simply pass the data through + // return NodeRunHelpers. + if (inputData.hasOwnProperty('main') && inputData.main.length > 0) { + // If the node is disabled simply return the data from the first main input + if (inputData.main[0] === null) { + return { data: undefined }; + } + return { data: [inputData.main[0]] }; + } + return { data: undefined }; + } + + const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); + + let connectionInputData: INodeExecutionData[] = []; + if (nodeType.execute || (!nodeType.poll && !nodeType.trigger && !nodeType.webhook)) { + // Only stop if first input is empty for execute runs. For all others run anyways + // because then it is a trigger node. As they only pass data through and so the input-data + // becomes output-data it has to be possible. + + if (inputData.main?.length > 0) { + // We always use the data of main input and the first input for execute + connectionInputData = inputData.main[0] as INodeExecutionData[]; + } + + const forceInputNodeExecution = workflow.settings.executionOrder !== 'v1'; + if (!forceInputNodeExecution) { + // If the nodes do not get force executed data of some inputs may be missing + // for that reason do we use the data of the first one that contains any + for (const mainData of inputData.main) { + if (mainData?.length) { + connectionInputData = mainData; + break; + } + } + } + + if (connectionInputData.length === 0) { + // No data for node so return + return { data: undefined }; + } + } + + if ( + runExecutionData.resultData.lastNodeExecuted === node.name && + runExecutionData.resultData.error !== undefined + ) { + // The node did already fail. So throw an error here that it displays and logs it correctly. + // Does get used by webhook and trigger nodes in case they throw an error that it is possible + // to log the error and display in Editor-UI. + if ( + runExecutionData.resultData.error.name === 'NodeOperationError' || + runExecutionData.resultData.error.name === 'NodeApiError' + ) { + throw runExecutionData.resultData.error; + } + + const error = new Error(runExecutionData.resultData.error.message); + error.stack = runExecutionData.resultData.error.stack; + throw error; + } + + if (node.executeOnce === true) { + // If node should be executed only once so use only the first input item + const newInputData: ITaskDataConnections = {}; + for (const connectionType of Object.keys(inputData)) { + newInputData[connectionType] = inputData[connectionType].map((input) => { + // eslint-disable-next-line @typescript-eslint/prefer-optional-chain + return input && input.slice(0, 1); + }); + } + inputData = newInputData; + } + + if (nodeType.execute) { + const closeFunctions: CloseFunction[] = []; + const context = new ExecuteContext( + workflow, + node, + additionalData, + mode, + runExecutionData, + runIndex, + connectionInputData, + inputData, + executionData, + closeFunctions, + abortSignal, + ); + + const data = + nodeType instanceof Node + ? await nodeType.execute(context) + : await nodeType.execute.call(context); + + const closeFunctionsResults = await Promise.allSettled( + closeFunctions.map(async (fn) => await fn()), + ); + + const closingErrors = closeFunctionsResults + .filter((result): result is PromiseRejectedResult => result.status === 'rejected') + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + .map((result) => result.reason); + + if (closingErrors.length > 0) { + if (closingErrors[0] instanceof Error) throw closingErrors[0]; + throw new ApplicationError("Error on execution node's close function(s)", { + extra: { nodeName: node.name }, + tags: { nodeType: node.type }, + cause: closingErrors, + }); + } + + return { data }; + } else if (nodeType.poll) { + if (mode === 'manual') { + // In manual mode run the poll function + const context = new PollContext(workflow, node, additionalData, mode, 'manual'); + return { data: await nodeType.poll.call(context) }; + } + // In any other mode pass data through as it already contains the result of the poll + return { data: inputData.main as INodeExecutionData[][] }; + } else if (nodeType.trigger) { + if (mode === 'manual') { + // In manual mode start the trigger + const triggerResponse = await Container.get(TriggersAndPollers).runTrigger( + workflow, + node, + NodeExecuteFunctions.getExecuteTriggerFunctions, + additionalData, + mode, + 'manual', + ); + + if (triggerResponse === undefined) { + return { data: null }; + } + + let closeFunction; + if (triggerResponse.closeFunction) { + // In manual mode we return the trigger closeFunction. That allows it to be called directly + // but we do not have to wait for it to finish. That is important for things like queue-nodes. + // There the full close will may be delayed till a message gets acknowledged after the execution. + // If we would not be able to wait for it to close would it cause problems with "own" mode as the + // process would be killed directly after it and so the acknowledge would not have been finished yet. + closeFunction = triggerResponse.closeFunction; + + // Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers + abortSignal?.addEventListener('abort', closeFunction); + } + + if (triggerResponse.manualTriggerFunction !== undefined) { + // If a manual trigger function is defined call it and wait till it did run + await triggerResponse.manualTriggerFunction(); + } + + const response = await triggerResponse.manualTriggerResponse!; + + if (response.length === 0) { + return { data: null, closeFunction }; + } + + return { data: response, closeFunction }; + } + // For trigger nodes in any mode except "manual" do we simply pass the data through + return { data: inputData.main as INodeExecutionData[][] }; + } else if (nodeType.webhook) { + // For webhook nodes always simply pass the data through + return { data: inputData.main as INodeExecutionData[][] }; + } else { + // For nodes which have routing information on properties + + const routingNode = new RoutingNode( + workflow, + node, + connectionInputData, + runExecutionData ?? null, + additionalData, + mode, + ); + + return { + data: await routingNode.runNode( + inputData, + runIndex, + nodeType, + executionData, + undefined, + abortSignal, + ), + }; + } + } + /** * Runs the given execution data. * @@ -909,7 +1190,7 @@ export class WorkflowExecute { const pinDataNodeNames = Object.keys(this.runExecutionData.resultData.pinData ?? {}); - const workflowIssues = workflow.checkReadyForExecution({ + const workflowIssues = this.checkReadyForExecution(workflow, { startNode, destinationNode, pinDataNodeNames, @@ -1171,12 +1452,12 @@ export class WorkflowExecute { workflowId: workflow.id, }); - let runNodeData = await workflow.runNode( + let runNodeData = await this.runNode( + workflow, executionData, this.runExecutionData, runIndex, this.additionalData, - NodeExecuteFunctions, this.mode, this.abortController.signal, ); @@ -1188,12 +1469,12 @@ export class WorkflowExecute { while (didContinueOnFail && tryIndex !== maxTries - 1) { await sleep(waitBetweenTries); - runNodeData = await workflow.runNode( + runNodeData = await this.runNode( + workflow, executionData, this.runExecutionData, runIndex, this.additionalData, - NodeExecuteFunctions, this.mode, this.abortController.signal, ); @@ -1230,19 +1511,20 @@ export class WorkflowExecute { const closeFunctions: CloseFunction[] = []; // Create a WorkflowDataProxy instance that we can get the data of the // item which did error - const executeFunctions = NodeExecuteFunctions.getExecuteFunctions( + const executeFunctions = new ExecuteContext( workflow, + executionData.node, + this.additionalData, + this.mode, this.runExecutionData, runIndex, [], executionData.data, - executionData.node, - this.additionalData, executionData, - this.mode, closeFunctions, this.abortController.signal, ); + const dataProxy = executeFunctions.getWorkflowDataProxy(0); // Loop over all outputs except the error output as it would not contain data by default diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 51ddd32678..13598d6d2b 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -11,6 +11,7 @@ export * from './DirectoryLoader'; export * from './Interfaces'; export { InstanceSettings, InstanceType } from './InstanceSettings'; export * from './NodeExecuteFunctions'; +export * from './RoutingNode'; export * from './WorkflowExecute'; export { NodeExecuteFunctions }; export * from './data-deduplication-service'; diff --git a/packages/core/test/NodeExecuteFunctions.test.ts b/packages/core/test/NodeExecuteFunctions.test.ts index cb2b43ca02..b1b6e96577 100644 --- a/packages/core/test/NodeExecuteFunctions.test.ts +++ b/packages/core/test/NodeExecuteFunctions.test.ts @@ -675,7 +675,6 @@ describe('NodeExecuteFunctions', () => { beforeEach(() => { nock.cleanAll(); - nock.disableNetConnect(); jest.clearAllMocks(); }); diff --git a/packages/workflow/test/RoutingNode.test.ts b/packages/core/test/RoutingNode.test.ts similarity index 95% rename from packages/workflow/test/RoutingNode.test.ts rename to packages/core/test/RoutingNode.test.ts index a38f4551e2..45ef937803 100644 --- a/packages/workflow/test/RoutingNode.test.ts +++ b/packages/core/test/RoutingNode.test.ts @@ -1,29 +1,30 @@ import { mock } from 'jest-mock-extended'; - +import { get } from 'lodash'; import type { - INode, - INodeExecutionData, - INodeParameters, DeclarativeRestApiSettings, - IRunExecutionData, - INodeProperties, + IExecuteData, IExecuteSingleFunctions, IHttpRequestOptions, - ITaskDataConnections, - INodeExecuteFunctions, + IN8nHttpFullResponse, + IN8nHttpResponse, IN8nRequestOperations, + INode, INodeCredentialDescription, - IExecuteData, + INodeExecutionData, + INodeParameters, + INodeProperties, + INodeType, INodeTypeDescription, + IRunExecutionData, + ITaskDataConnections, IWorkflowExecuteAdditionalData, - IExecuteFunctions, -} from '@/Interfaces'; -import { applyDeclarativeNodeOptionParameters } from '@/NodeHelpers'; -import { RoutingNode } from '@/RoutingNode'; -import * as utilsModule from '@/utils'; -import { Workflow } from '@/Workflow'; +} from 'n8n-workflow'; +import { NodeHelpers, Workflow } from 'n8n-workflow'; -import * as Helpers from './Helpers'; +import * as executionContexts from '@/node-execution-context'; +import { RoutingNode } from '@/RoutingNode'; + +import { NodeTypes } from './helpers'; const postReceiveFunction1 = async function ( this: IExecuteSingleFunctions, @@ -42,14 +43,55 @@ const preSendFunction1 = async function ( return requestOptions; }; +const getExecuteSingleFunctions = ( + workflow: Workflow, + runExecutionData: IRunExecutionData, + runIndex: number, + node: INode, + itemIndex: number, +) => + mock({ + getItemIndex: () => itemIndex, + getNodeParameter: (parameterName: string) => + workflow.expression.getParameterValue( + get(node.parameters, parameterName), + runExecutionData, + runIndex, + itemIndex, + node.name, + [], + 'internal', + {}, + ), + getWorkflow: () => ({ + id: workflow.id, + name: workflow.name, + active: workflow.active, + }), + helpers: mock({ + async httpRequest( + requestOptions: IHttpRequestOptions, + ): Promise { + return { + body: { + headers: {}, + statusCode: 200, + requestOptions, + }, + }; + }, + }), + }); + describe('RoutingNode', () => { + const nodeTypes = NodeTypes(); const additionalData = mock(); test('applyDeclarativeNodeOptionParameters', () => { - const nodeTypes = Helpers.NodeTypes(); + const nodeTypes = NodeTypes(); const nodeType = nodeTypes.getByNameAndVersion('test.setMulti'); - applyDeclarativeNodeOptionParameters(nodeType); + NodeHelpers.applyDeclarativeNodeOptionParameters(nodeType); const options = nodeType.description.properties.find( (property) => property.name === 'requestOptions', @@ -667,7 +709,7 @@ describe('RoutingNode', () => { }, ]; - const nodeTypes = Helpers.NodeTypes(); + const nodeTypes = NodeTypes(); const node: INode = { parameters: {}, name: 'test', @@ -711,7 +753,7 @@ describe('RoutingNode', () => { mode, ); - const executeSingleFunctions = Helpers.getExecuteSingleFunctions( + const executeSingleFunctions = getExecuteSingleFunctions( workflow, runExecutionData, runIndex, @@ -1861,7 +1903,6 @@ describe('RoutingNode', () => { }, ]; - const nodeTypes = Helpers.NodeTypes(); const baseNode: INode = { parameters: {}, name: 'test', @@ -1877,7 +1918,7 @@ describe('RoutingNode', () => { const connectionInputData: INodeExecutionData[] = []; const runExecutionData: IRunExecutionData = { resultData: { runData: {} } }; const nodeType = nodeTypes.getByNameAndVersion(baseNode.type); - applyDeclarativeNodeOptionParameters(nodeType); + NodeHelpers.applyDeclarativeNodeOptionParameters(nodeType); const propertiesOriginal = nodeType.description.properties; @@ -1921,8 +1962,8 @@ describe('RoutingNode', () => { source: null, } as IExecuteData; - const executeFunctions = mock(); - const executeSingleFunctions = Helpers.getExecuteSingleFunctions( + const executeFunctions = mock(); + const executeSingleFunctions = getExecuteSingleFunctions( workflow, runExecutionData, runIndex, @@ -1930,10 +1971,10 @@ describe('RoutingNode', () => { itemIndex, ); - const nodeExecuteFunctions: Partial = { - getExecuteFunctions: () => executeFunctions, - getExecuteSingleFunctions: () => executeSingleFunctions, - }; + jest.spyOn(executionContexts, 'ExecuteContext').mockReturnValue(executeFunctions); + jest + .spyOn(executionContexts, 'ExecuteSingleContext') + .mockReturnValue(executeSingleFunctions); const numberOfItems = testData.input.specialTestOptions?.numberOfItems ?? 1; if (!inputData.main[0] || inputData.main[0].length !== numberOfItems) { @@ -1943,7 +1984,8 @@ describe('RoutingNode', () => { } } - const spy = jest.spyOn(utilsModule, 'sleep').mockReturnValue( + const workflowPackage = await import('n8n-workflow'); + const spy = jest.spyOn(workflowPackage, 'sleep').mockReturnValue( new Promise((resolve) => { resolve(); }), @@ -1956,18 +1998,13 @@ describe('RoutingNode', () => { ); const getNodeParameter = executeSingleFunctions.getNodeParameter; + // @ts-expect-error overwriting a method executeSingleFunctions.getNodeParameter = (parameterName: string) => parameterName in testData.input.node.parameters ? testData.input.node.parameters[parameterName] : (getNodeParameter(parameterName) ?? {}); - const result = await routingNode.runNode( - inputData, - runIndex, - nodeType, - executeData, - nodeExecuteFunctions as INodeExecuteFunctions, - ); + const result = await routingNode.runNode(inputData, runIndex, nodeType, executeData); if (testData.input.specialTestOptions?.sleepCalls) { expect(spy.mock.calls).toEqual(testData.input.specialTestOptions?.sleepCalls); @@ -2042,7 +2079,6 @@ describe('RoutingNode', () => { }, ]; - const nodeTypes = Helpers.NodeTypes(); const baseNode: INode = { parameters: {}, name: 'test', @@ -2052,12 +2088,10 @@ describe('RoutingNode', () => { position: [0, 0], }; - const mode = 'internal'; const runIndex = 0; const itemIndex = 0; - const connectionInputData: INodeExecutionData[] = []; const runExecutionData: IRunExecutionData = { resultData: { runData: {} } }; - const nodeType = nodeTypes.getByNameAndVersion(baseNode.type); + const nodeType = mock(); const inputData: ITaskDataConnections = { main: [ @@ -2093,53 +2127,17 @@ describe('RoutingNode', () => { nodeTypes, }); - const routingNode = new RoutingNode( - workflow, - node, - connectionInputData, - runExecutionData ?? null, - additionalData, - mode, - ); - - const executeData = { - data: {}, - node, - source: null, - } as IExecuteData; - let currentItemIndex = 0; for (let iteration = 0; iteration < inputData.main[0]!.length; iteration++) { - const nodeExecuteFunctions: Partial = { - getExecuteSingleFunctions: () => { - return Helpers.getExecuteSingleFunctions( - workflow, - runExecutionData, - runIndex, - node, - itemIndex + iteration, - ); - }, - }; - - if (!nodeExecuteFunctions.getExecuteSingleFunctions) { - fail('Expected nodeExecuteFunctions to contain getExecuteSingleFunctions'); - } - - const routingNodeExecutionContext = nodeExecuteFunctions.getExecuteSingleFunctions( - routingNode.workflow, - routingNode.runExecutionData, + const context = getExecuteSingleFunctions( + workflow, + runExecutionData, runIndex, - routingNode.connectionInputData, - inputData, - routingNode.node, - iteration, - routingNode.additionalData, - executeData, - routingNode.mode, + node, + itemIndex + iteration, ); - - currentItemIndex = routingNodeExecutionContext.getItemIndex(); + jest.spyOn(executionContexts, 'ExecuteSingleContext').mockReturnValue(context); + currentItemIndex = context.getItemIndex(); } const expectedItemIndex = inputData.main[0]!.length - 1; diff --git a/packages/core/test/TriggersAndPollers.test.ts b/packages/core/test/TriggersAndPollers.test.ts new file mode 100644 index 0000000000..c30a0693a6 --- /dev/null +++ b/packages/core/test/TriggersAndPollers.test.ts @@ -0,0 +1,157 @@ +import { mock } from 'jest-mock-extended'; +import { ApplicationError } from 'n8n-workflow'; +import type { + Workflow, + INode, + INodeExecutionData, + IPollFunctions, + IWorkflowExecuteAdditionalData, + INodeType, + INodeTypes, + ITriggerFunctions, +} from 'n8n-workflow'; + +import { TriggersAndPollers } from '@/TriggersAndPollers'; + +describe('TriggersAndPollers', () => { + const node = mock(); + const nodeType = mock({ + trigger: undefined, + poll: undefined, + }); + const nodeTypes = mock(); + const workflow = mock({ nodeTypes }); + const additionalData = mock({ + hooks: { + hookFunctions: { + sendResponse: [], + }, + }, + }); + const triggersAndPollers = new TriggersAndPollers(); + + beforeEach(() => { + jest.clearAllMocks(); + nodeTypes.getByNameAndVersion.mockReturnValue(nodeType); + }); + + describe('runTrigger()', () => { + const triggerFunctions = mock(); + const getTriggerFunctions = jest.fn().mockReturnValue(triggerFunctions); + const triggerFn = jest.fn(); + + it('should throw error if node type does not have trigger function', async () => { + await expect( + triggersAndPollers.runTrigger( + workflow, + node, + getTriggerFunctions, + additionalData, + 'trigger', + 'init', + ), + ).rejects.toThrow(ApplicationError); + }); + + it('should call trigger function in regular mode', async () => { + nodeType.trigger = triggerFn; + triggerFn.mockResolvedValue({ test: true }); + + const result = await triggersAndPollers.runTrigger( + workflow, + node, + getTriggerFunctions, + additionalData, + 'trigger', + 'init', + ); + + expect(triggerFn).toHaveBeenCalled(); + expect(result).toEqual({ test: true }); + }); + + it('should handle manual mode with promise resolution', async () => { + const mockEmitData: INodeExecutionData[][] = [[{ json: { data: 'test' } }]]; + const mockTriggerResponse = { workflowId: '123' }; + + nodeType.trigger = triggerFn; + triggerFn.mockResolvedValue(mockTriggerResponse); + + const result = await triggersAndPollers.runTrigger( + workflow, + node, + getTriggerFunctions, + additionalData, + 'manual', + 'init', + ); + + expect(result).toBeDefined(); + expect(result?.manualTriggerResponse).toBeInstanceOf(Promise); + + // Simulate emit + const mockTriggerFunctions = getTriggerFunctions.mock.results[0]?.value; + if (mockTriggerFunctions?.emit) { + mockTriggerFunctions.emit(mockEmitData); + } + }); + + it('should handle error emission in manual mode', async () => { + const testError = new Error('Test error'); + + nodeType.trigger = triggerFn; + triggerFn.mockResolvedValue({}); + + const result = await triggersAndPollers.runTrigger( + workflow, + node, + getTriggerFunctions, + additionalData, + 'manual', + 'init', + ); + + expect(result?.manualTriggerResponse).toBeInstanceOf(Promise); + + // Simulate error + const mockTriggerFunctions = getTriggerFunctions.mock.results[0]?.value; + if (mockTriggerFunctions?.emitError) { + mockTriggerFunctions.emitError(testError); + } + + await expect(result?.manualTriggerResponse).rejects.toThrow(testError); + }); + }); + + describe('runPoll()', () => { + const pollFunctions = mock(); + const pollFn = jest.fn(); + + it('should throw error if node type does not have poll function', async () => { + await expect(triggersAndPollers.runPoll(workflow, node, pollFunctions)).rejects.toThrow( + ApplicationError, + ); + }); + + it('should call poll function and return result', async () => { + const mockPollResult: INodeExecutionData[][] = [[{ json: { data: 'test' } }]]; + nodeType.poll = pollFn; + pollFn.mockResolvedValue(mockPollResult); + + const result = await triggersAndPollers.runPoll(workflow, node, pollFunctions); + + expect(pollFn).toHaveBeenCalled(); + expect(result).toBe(mockPollResult); + }); + + it('should return null if poll function returns no data', async () => { + nodeType.poll = pollFn; + pollFn.mockResolvedValue(null); + + const result = await triggersAndPollers.runPoll(workflow, node, pollFunctions); + + expect(pollFn).toHaveBeenCalled(); + expect(result).toBeNull(); + }); + }); +}); diff --git a/packages/core/test/WorkflowExecute.test.ts b/packages/core/test/WorkflowExecute.test.ts index f6f9bd2cbf..6a826a8118 100644 --- a/packages/core/test/WorkflowExecute.test.ts +++ b/packages/core/test/WorkflowExecute.test.ts @@ -9,12 +9,26 @@ // XX denotes that the node is disabled // PD denotes that the node has pinned data +import { mock } from 'jest-mock-extended'; import { pick } from 'lodash'; -import type { IPinData, IRun, IRunData, WorkflowTestData } from 'n8n-workflow'; +import type { + IExecuteData, + INode, + INodeType, + INodeTypes, + IPinData, + IRun, + IRunData, + IRunExecutionData, + ITriggerResponse, + IWorkflowExecuteAdditionalData, + WorkflowTestData, +} from 'n8n-workflow'; import { ApplicationError, createDeferredPromise, NodeExecutionOutput, + NodeHelpers, Workflow, } from 'n8n-workflow'; @@ -444,4 +458,150 @@ describe('WorkflowExecute', () => { ); }); }); + + describe('checkReadyForExecution', () => { + const disabledNode = mock({ name: 'Disabled Node', disabled: true }); + const startNode = mock({ name: 'Start Node' }); + const unknownNode = mock({ name: 'Unknown Node', type: 'unknownNode' }); + + const nodeParamIssuesSpy = jest.spyOn(NodeHelpers, 'getNodeParametersIssues'); + + const nodeTypes = mock(); + nodeTypes.getByNameAndVersion.mockImplementation((type) => { + // TODO: getByNameAndVersion signature needs to be updated to allow returning undefined + if (type === 'unknownNode') return undefined as unknown as INodeType; + return mock({ + description: { + properties: [], + }, + }); + }); + const workflowExecute = new WorkflowExecute(mock(), 'manual'); + + beforeEach(() => jest.clearAllMocks()); + + it('should return null if there are no nodes', () => { + const workflow = new Workflow({ + nodes: [], + connections: {}, + active: false, + nodeTypes, + }); + + const issues = workflowExecute.checkReadyForExecution(workflow); + expect(issues).toBe(null); + expect(nodeTypes.getByNameAndVersion).not.toHaveBeenCalled(); + expect(nodeParamIssuesSpy).not.toHaveBeenCalled(); + }); + + it('should return null if there are no enabled nodes', () => { + const workflow = new Workflow({ + nodes: [disabledNode], + connections: {}, + active: false, + nodeTypes, + }); + + const issues = workflowExecute.checkReadyForExecution(workflow, { + startNode: disabledNode.name, + }); + expect(issues).toBe(null); + expect(nodeTypes.getByNameAndVersion).toHaveBeenCalledTimes(1); + expect(nodeParamIssuesSpy).not.toHaveBeenCalled(); + }); + + it('should return typeUnknown for unknown nodes', () => { + const workflow = new Workflow({ + nodes: [unknownNode], + connections: {}, + active: false, + nodeTypes, + }); + + const issues = workflowExecute.checkReadyForExecution(workflow, { + startNode: unknownNode.name, + }); + expect(issues).toEqual({ [unknownNode.name]: { typeUnknown: true } }); + expect(nodeTypes.getByNameAndVersion).toHaveBeenCalledTimes(2); + expect(nodeParamIssuesSpy).not.toHaveBeenCalled(); + }); + + it('should return issues for regular nodes', () => { + const workflow = new Workflow({ + nodes: [startNode], + connections: {}, + active: false, + nodeTypes, + }); + nodeParamIssuesSpy.mockReturnValue({ execution: false }); + + const issues = workflowExecute.checkReadyForExecution(workflow, { + startNode: startNode.name, + }); + expect(issues).toEqual({ [startNode.name]: { execution: false } }); + expect(nodeTypes.getByNameAndVersion).toHaveBeenCalledTimes(2); + expect(nodeParamIssuesSpy).toHaveBeenCalled(); + }); + }); + + describe('runNode', () => { + const nodeTypes = mock(); + const triggerNode = mock(); + const triggerResponse = mock({ + closeFunction: jest.fn(), + // This node should never trigger, or return + manualTriggerFunction: async () => await new Promise(() => {}), + }); + const triggerNodeType = mock({ + description: { + properties: [], + }, + execute: undefined, + poll: undefined, + webhook: undefined, + async trigger() { + return triggerResponse; + }, + }); + + nodeTypes.getByNameAndVersion.mockReturnValue(triggerNodeType); + + const workflow = new Workflow({ + nodeTypes, + nodes: [triggerNode], + connections: {}, + active: false, + }); + + const executionData = mock(); + const runExecutionData = mock(); + const additionalData = mock(); + const abortController = new AbortController(); + const workflowExecute = new WorkflowExecute(additionalData, 'manual'); + + test('should call closeFunction when manual trigger is aborted', async () => { + const runPromise = workflowExecute.runNode( + workflow, + executionData, + runExecutionData, + 0, + additionalData, + 'manual', + abortController.signal, + ); + // Yield back to the event-loop to let async parts of `runNode` execute + await new Promise((resolve) => setImmediate(resolve)); + + let isSettled = false; + void runPromise.then(() => { + isSettled = true; + }); + expect(isSettled).toBe(false); + expect(abortController.signal.aborted).toBe(false); + expect(triggerResponse.closeFunction).not.toHaveBeenCalled(); + + abortController.abort(); + expect(triggerResponse.closeFunction).toHaveBeenCalled(); + }); + }); }); diff --git a/packages/core/test/helpers/constants.ts b/packages/core/test/helpers/constants.ts index ac8c366866..e67caadd1e 100644 --- a/packages/core/test/helpers/constants.ts +++ b/packages/core/test/helpers/constants.ts @@ -102,6 +102,89 @@ export const predefinedNodesTypes: INodeTypeData = { }, }, }, + 'test.set': { + sourcePath: '', + type: { + description: { + displayName: 'Set', + name: 'set', + group: ['input'], + version: 1, + description: 'Sets a value', + defaults: { + name: 'Set', + color: '#0000FF', + }, + inputs: [NodeConnectionType.Main], + outputs: [NodeConnectionType.Main], + properties: [ + { + displayName: 'Value1', + name: 'value1', + type: 'string', + default: 'default-value1', + }, + { + displayName: 'Value2', + name: 'value2', + type: 'string', + default: 'default-value2', + }, + ], + }, + }, + }, + 'test.setMulti': { + sourcePath: '', + type: { + description: { + displayName: 'Set Multi', + name: 'setMulti', + group: ['input'], + version: 1, + description: 'Sets multiple values', + defaults: { + name: 'Set Multi', + color: '#0000FF', + }, + inputs: [NodeConnectionType.Main], + outputs: [NodeConnectionType.Main], + properties: [ + { + displayName: 'Values', + name: 'values', + type: 'fixedCollection', + typeOptions: { + multipleValues: true, + }, + default: {}, + options: [ + { + name: 'string', + displayName: 'String', + values: [ + { + displayName: 'Name', + name: 'name', + type: 'string', + default: 'propertyName', + placeholder: 'Name of the property to write data to.', + }, + { + displayName: 'Value', + name: 'value', + type: 'string', + default: '', + placeholder: 'The string value to write in the property.', + }, + ], + }, + ], + }, + ], + }, + }, + }, }; export const legacyWorkflowExecuteTests: WorkflowTestData[] = [ diff --git a/packages/core/test/helpers/index.ts b/packages/core/test/helpers/index.ts index 2ced1a423b..14f19789b0 100644 --- a/packages/core/test/helpers/index.ts +++ b/packages/core/test/helpers/index.ts @@ -24,7 +24,7 @@ import { predefinedNodesTypes } from './constants'; const BASE_DIR = path.resolve(__dirname, '../../..'); class NodeTypesClass implements INodeTypes { - constructor(private nodeTypes: INodeTypeData = predefinedNodesTypes) {} + constructor(private nodeTypes: INodeTypeData) {} getByName(nodeType: string): INodeType | IVersionedNodeType { return this.nodeTypes[nodeType].type; @@ -41,7 +41,7 @@ class NodeTypesClass implements INodeTypes { let nodeTypesInstance: NodeTypesClass | undefined; -export function NodeTypes(nodeTypes?: INodeTypeData): INodeTypes { +export function NodeTypes(nodeTypes: INodeTypeData = predefinedNodesTypes): INodeTypes { if (nodeTypesInstance === undefined || nodeTypes !== undefined) { nodeTypesInstance = new NodeTypesClass(nodeTypes); } diff --git a/packages/editor-ui/src/components/NodeSettings.vue b/packages/editor-ui/src/components/NodeSettings.vue index 5256f41983..c7e41fb00f 100644 --- a/packages/editor-ui/src/components/NodeSettings.vue +++ b/packages/editor-ui/src/components/NodeSettings.vue @@ -162,7 +162,7 @@ const executeButtonTooltip = computed(() => { node.value && isLatestNodeVersion.value && props.inputSize > 1 && - !NodeHelpers.isSingleExecution(node.value.type, node.value.parameters) + !nodeHelpers.isSingleExecution(node.value.type, node.value.parameters) ) { return i18n.baseText('nodeSettings.executeButtonTooltip.times', { interpolate: { inputSize: props.inputSize }, diff --git a/packages/editor-ui/src/composables/useNodeHelpers.test.ts b/packages/editor-ui/src/composables/useNodeHelpers.test.ts index 538a40c5e2..c5ba6dc1f3 100644 --- a/packages/editor-ui/src/composables/useNodeHelpers.test.ts +++ b/packages/editor-ui/src/composables/useNodeHelpers.test.ts @@ -234,4 +234,37 @@ describe('useNodeHelpers()', () => { expect(webhook.webhookId).toMatch(/\w+(-\w+)+/); }); }); + + describe('isSingleExecution', () => { + let isSingleExecution: ReturnType['isSingleExecution']; + beforeEach(() => { + isSingleExecution = useNodeHelpers().isSingleExecution; + }); + + test('should determine based on node parameters if it would be executed once', () => { + expect(isSingleExecution('n8n-nodes-base.code', {})).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.code', { mode: 'runOnceForEachItem' })).toEqual( + false, + ); + expect(isSingleExecution('n8n-nodes-base.executeWorkflow', {})).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.executeWorkflow', { mode: 'each' })).toEqual(false); + expect(isSingleExecution('n8n-nodes-base.crateDb', {})).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.crateDb', { operation: 'update' })).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.timescaleDb', {})).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.timescaleDb', { operation: 'update' })).toEqual( + true, + ); + expect(isSingleExecution('n8n-nodes-base.microsoftSql', {})).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.microsoftSql', { operation: 'update' })).toEqual( + true, + ); + expect(isSingleExecution('n8n-nodes-base.microsoftSql', { operation: 'delete' })).toEqual( + true, + ); + expect(isSingleExecution('n8n-nodes-base.questDb', {})).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.mongoDb', { operation: 'insert' })).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.mongoDb', { operation: 'update' })).toEqual(true); + expect(isSingleExecution('n8n-nodes-base.redis', {})).toEqual(true); + }); + }); }); diff --git a/packages/editor-ui/src/composables/useNodeHelpers.ts b/packages/editor-ui/src/composables/useNodeHelpers.ts index 95ab27d7ec..461f550212 100644 --- a/packages/editor-ui/src/composables/useNodeHelpers.ts +++ b/packages/editor-ui/src/composables/useNodeHelpers.ts @@ -35,6 +35,7 @@ import type { INodeTypeNameVersion, IConnection, IPinData, + NodeParameterValue, } from 'n8n-workflow'; import type { @@ -1268,6 +1269,50 @@ export function useNodeHelpers() { return id; } + /** nodes that would execute only once with such parameters add 'undefined' to parameters values if it is parameter's default value */ + const SINGLE_EXECUTION_NODES: { [key: string]: { [key: string]: NodeParameterValue[] } } = { + 'n8n-nodes-base.code': { + mode: [undefined, 'runOnceForAllItems'], + }, + 'n8n-nodes-base.executeWorkflow': { + mode: [undefined, 'once'], + }, + 'n8n-nodes-base.crateDb': { + operation: [undefined, 'update'], // default insert + }, + 'n8n-nodes-base.timescaleDb': { + operation: [undefined, 'update'], // default insert + }, + 'n8n-nodes-base.microsoftSql': { + operation: [undefined, 'update', 'delete'], // default insert + }, + 'n8n-nodes-base.questDb': { + operation: [undefined], // default insert + }, + 'n8n-nodes-base.mongoDb': { + operation: ['insert', 'update'], + }, + 'n8n-nodes-base.redis': { + operation: [undefined], // default info + }, + }; + + function isSingleExecution(type: string, parameters: INodeParameters): boolean { + const singleExecutionCase = SINGLE_EXECUTION_NODES[type]; + + if (singleExecutionCase) { + for (const parameter of Object.keys(singleExecutionCase)) { + if (!singleExecutionCase[parameter].includes(parameters[parameter] as NodeParameterValue)) { + return false; + } + } + + return true; + } + + return false; + } + return { hasProxyAuth, isCustomApiCallSelected, @@ -1305,5 +1350,6 @@ export function useNodeHelpers() { getNodeTaskData, assignNodeId, assignWebhookId, + isSingleExecution, }; } diff --git a/packages/workflow/src/Constants.ts b/packages/workflow/src/Constants.ts index ea05e58c59..8f35e5f257 100644 --- a/packages/workflow/src/Constants.ts +++ b/packages/workflow/src/Constants.ts @@ -1,5 +1,3 @@ -import type { NodeParameterValue } from './Interfaces'; - export const DIGITS = '0123456789'; export const UPPERCASE_LETTERS = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'; export const LOWERCASE_LETTERS = UPPERCASE_LETTERS.toLowerCase(); @@ -87,35 +85,6 @@ export const LANGCHAIN_CUSTOM_TOOLS = [ HTTP_REQUEST_TOOL_LANGCHAIN_NODE_TYPE, ]; -//nodes that would execute only once with such parameters -//add 'undefined' to parameters values if it is parameter's default value -export const SINGLE_EXECUTION_NODES: { [key: string]: { [key: string]: NodeParameterValue[] } } = { - 'n8n-nodes-base.code': { - mode: [undefined, 'runOnceForAllItems'], - }, - 'n8n-nodes-base.executeWorkflow': { - mode: [undefined, 'once'], - }, - 'n8n-nodes-base.crateDb': { - operation: [undefined, 'update'], // default insert - }, - 'n8n-nodes-base.timescaleDb': { - operation: [undefined, 'update'], // default insert - }, - 'n8n-nodes-base.microsoftSql': { - operation: [undefined, 'update', 'delete'], // default insert - }, - 'n8n-nodes-base.questDb': { - operation: [undefined], // default insert - }, - 'n8n-nodes-base.mongoDb': { - operation: ['insert', 'update'], - }, - 'n8n-nodes-base.redis': { - operation: [undefined], // default info - }, -}; - export const SEND_AND_WAIT_OPERATION = 'sendAndWait'; export const AI_TRANSFORM_CODE_GENERATED_FOR_PROMPT = 'codeGeneratedForPrompt'; export const AI_TRANSFORM_JS_CODE = 'jsCode'; diff --git a/packages/workflow/src/Interfaces.ts b/packages/workflow/src/Interfaces.ts index 77325caf03..4f5bf9e6d1 100644 --- a/packages/workflow/src/Interfaces.ts +++ b/packages/workflow/src/Interfaces.ts @@ -421,60 +421,6 @@ export interface IRunNodeResponse { data: INodeExecutionData[][] | NodeExecutionOutput | null | undefined; closeFunction?: CloseFunction; } -export interface IGetExecuteFunctions { - ( - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - connectionInputData: INodeExecutionData[], - inputData: ITaskDataConnections, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - executeData: IExecuteData, - mode: WorkflowExecuteMode, - closeFunctions: CloseFunction[], - abortSignal?: AbortSignal, - ): IExecuteFunctions; -} - -export interface IGetExecuteSingleFunctions { - ( - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - connectionInputData: INodeExecutionData[], - inputData: ITaskDataConnections, - node: INode, - itemIndex: number, - additionalData: IWorkflowExecuteAdditionalData, - executeData: IExecuteData, - mode: WorkflowExecuteMode, - abortSignal?: AbortSignal, - ): IExecuteSingleFunctions; -} - -export interface IGetExecuteHookFunctions { - ( - workflow: Workflow, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - mode: WorkflowExecuteMode, - activation: WorkflowActivateMode, - webhookData?: IWebhookData, - ): IHookFunctions; -} - -export interface IGetExecuteWebhookFunctions { - ( - workflow: Workflow, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - mode: WorkflowExecuteMode, - webhookData: IWebhookData, - closeFunctions: CloseFunction[], - runExecutionData: IRunExecutionData | null, - ): IWebhookFunctions; -} export interface ISourceDataConnections { // Key for each input type and because there can be multiple inputs of the same type it is an array @@ -1226,10 +1172,6 @@ export interface INodeExecutionData { export interface INodeExecuteFunctions { getExecutePollFunctions: IGetExecutePollFunctions; getExecuteTriggerFunctions: IGetExecuteTriggerFunctions; - getExecuteFunctions: IGetExecuteFunctions; - getExecuteSingleFunctions: IGetExecuteSingleFunctions; - getExecuteHookFunctions: IGetExecuteHookFunctions; - getExecuteWebhookFunctions: IGetExecuteWebhookFunctions; } export type NodeParameterValue = string | number | boolean | undefined | null; diff --git a/packages/workflow/src/NodeHelpers.ts b/packages/workflow/src/NodeHelpers.ts index 5f2ad7ec96..114142bc34 100644 --- a/packages/workflow/src/NodeHelpers.ts +++ b/packages/workflow/src/NodeHelpers.ts @@ -6,13 +6,11 @@ import get from 'lodash/get'; import isEqual from 'lodash/isEqual'; -import { SINGLE_EXECUTION_NODES } from './Constants'; import { ApplicationError } from './errors/application.error'; import { NodeConnectionType } from './Interfaces'; import type { FieldType, IContextObject, - IHttpRequestMethods, INode, INodeCredentialDescription, INodeIssueObjectProperty, @@ -29,12 +27,9 @@ import type { IParameterDependencies, IRunExecutionData, IVersionedNodeType, - IWebhookData, - IWorkflowExecuteAdditionalData, NodeParameterValue, ResourceMapperValue, INodeTypeDescription, - INodeTypeBaseDescription, INodeOutputConfiguration, INodeInputConfiguration, GenericValue, @@ -239,33 +234,6 @@ export const cronNodeOptions: INodePropertyCollection[] = [ }, ]; -const commonPollingParameters: INodeProperties[] = [ - { - displayName: 'Poll Times', - name: 'pollTimes', - type: 'fixedCollection', - typeOptions: { - multipleValues: true, - multipleValueButtonText: 'Add Poll Time', - }, - default: { item: [{ mode: 'everyMinute' }] }, - description: 'Time at which polling should occur', - placeholder: 'Add Poll Time', - options: cronNodeOptions, - }, -]; - -export const commonCORSParameters: INodeProperties[] = [ - { - displayName: 'Allowed Origins (CORS)', - name: 'allowedOrigins', - type: 'string', - default: '*', - description: - 'Comma-separated list of URLs allowed for cross-origin non-preflight requests. Use * (default) to allow all origins.', - }, -]; - const declarativeNodeOptionParameters: INodeProperties = { displayName: 'Request Options', name: 'requestOptions', @@ -347,101 +315,6 @@ const declarativeNodeOptionParameters: INodeProperties = { ], }; -/** - * Modifies the description of the passed in object, such that it can be used - * as an AI Agent Tool. - * Returns the modified item (not copied) - */ -export function convertNodeToAiTool< - T extends object & { description: INodeTypeDescription | INodeTypeBaseDescription }, ->(item: T): T { - // quick helper function for type-guard down below - function isFullDescription(obj: unknown): obj is INodeTypeDescription { - return typeof obj === 'object' && obj !== null && 'properties' in obj; - } - - if (isFullDescription(item.description)) { - item.description.name += 'Tool'; - item.description.inputs = []; - item.description.outputs = [NodeConnectionType.AiTool]; - item.description.displayName += ' Tool'; - delete item.description.usableAsTool; - - const hasResource = item.description.properties.some((prop) => prop.name === 'resource'); - const hasOperation = item.description.properties.some((prop) => prop.name === 'operation'); - - if (!item.description.properties.map((prop) => prop.name).includes('toolDescription')) { - const descriptionType: INodeProperties = { - displayName: 'Tool Description', - name: 'descriptionType', - type: 'options', - noDataExpression: true, - options: [ - { - name: 'Set Automatically', - value: 'auto', - description: 'Automatically set based on resource and operation', - }, - { - name: 'Set Manually', - value: 'manual', - description: 'Manually set the description', - }, - ], - default: 'auto', - }; - - const descProp: INodeProperties = { - displayName: 'Description', - name: 'toolDescription', - type: 'string', - default: item.description.description, - required: true, - typeOptions: { rows: 2 }, - description: - 'Explain to the LLM what this tool does, a good, specific description would allow LLMs to produce expected results much more often', - placeholder: `e.g. ${item.description.description}`, - }; - - const noticeProp: INodeProperties = { - displayName: - "Use the expression {{ $fromAI('placeholder_name') }} for any data to be filled by the model", - name: 'notice', - type: 'notice', - default: '', - }; - - item.description.properties.unshift(descProp); - - // If node has resource or operation we can determine pre-populate tool description based on it - // so we add the descriptionType property as the first property - if (hasResource || hasOperation) { - item.description.properties.unshift(descriptionType); - - descProp.displayOptions = { - show: { - descriptionType: ['manual'], - }, - }; - } - - item.description.properties.unshift(noticeProp); - } - } - - const resources = item.description.codex?.resources ?? {}; - - item.description.codex = { - categories: ['AI'], - subcategories: { - AI: ['Tools'], - Tools: ['Other Tools'], - }, - resources, - }; - return item; -} - /** * Determines if the provided node type has any output types other than the main connection type. * @param typeDescription The node's type description to check. @@ -514,27 +387,6 @@ export function applyDeclarativeNodeOptionParameters(nodeType: INodeType): void return; } -/** - * Apply special parameters which should be added to nodeTypes depending on their type or configuration - */ -export function applySpecialNodeParameters(nodeType: INodeType): void { - const { properties, polling, supportsCORS } = nodeType.description; - if (polling) { - properties.unshift(...commonPollingParameters); - } - if (nodeType.webhook && supportsCORS) { - const optionsProperty = properties.find(({ name }) => name === 'options'); - if (optionsProperty) - optionsProperty.options = [ - ...commonCORSParameters, - ...(optionsProperty.options as INodePropertyOptions[]), - ]; - else properties.push(...commonCORSParameters); - } - - applyDeclarativeNodeOptionParameters(nodeType); -} - const getPropertyValues = ( nodeValues: INodeParameters, propertyName: string, @@ -747,7 +599,6 @@ export function getContext( /** * Returns which parameters are dependent on which - * */ function getParameterDependencies(nodePropertiesArray: INodeProperties[]): IParameterDependencies { const dependencies: IParameterDependencies = {}; @@ -783,7 +634,6 @@ function getParameterDependencies(nodePropertiesArray: INodeProperties[]): IPara /** * Returns in which order the parameters should be resolved * to have the parameters available they depend on - * */ export function getParameterResolveOrder( nodePropertiesArray: INodeProperties[], @@ -1177,121 +1027,8 @@ export function getNodeParameters( return nodeParameters; } -/** - * Returns all the webhooks which should be created for the give node - */ -export function getNodeWebhooks( - workflow: Workflow, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - ignoreRestartWebhooks = false, -): IWebhookData[] { - if (node.disabled === true) { - // Node is disabled so webhooks will also not be enabled - return []; - } - - const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - - if (nodeType.description.webhooks === undefined) { - // Node does not have any webhooks so return - return []; - } - - const workflowId = workflow.id || '__UNSAVED__'; - const mode = 'internal'; - - const returnData: IWebhookData[] = []; - for (const webhookDescription of nodeType.description.webhooks) { - if (ignoreRestartWebhooks && webhookDescription.restartWebhook === true) { - continue; - } - - let nodeWebhookPath = workflow.expression.getSimpleParameterValue( - node, - webhookDescription.path, - mode, - {}, - ); - if (nodeWebhookPath === undefined) { - // TODO: Use a proper logger - console.error( - `No webhook path could be found for node "${node.name}" in workflow "${workflowId}".`, - ); - continue; - } - - nodeWebhookPath = nodeWebhookPath.toString(); - - if (nodeWebhookPath.startsWith('/')) { - nodeWebhookPath = nodeWebhookPath.slice(1); - } - if (nodeWebhookPath.endsWith('/')) { - nodeWebhookPath = nodeWebhookPath.slice(0, -1); - } - - const isFullPath: boolean = workflow.expression.getSimpleParameterValue( - node, - webhookDescription.isFullPath, - 'internal', - {}, - undefined, - false, - ) as boolean; - const restartWebhook: boolean = workflow.expression.getSimpleParameterValue( - node, - webhookDescription.restartWebhook, - 'internal', - {}, - undefined, - false, - ) as boolean; - const path = getNodeWebhookPath(workflowId, node, nodeWebhookPath, isFullPath, restartWebhook); - - const webhookMethods = workflow.expression.getSimpleParameterValue( - node, - webhookDescription.httpMethod, - mode, - {}, - undefined, - 'GET', - ); - - if (webhookMethods === undefined) { - // TODO: Use a proper logger - console.error( - `The webhook "${path}" for node "${node.name}" in workflow "${workflowId}" could not be added because the httpMethod is not defined.`, - ); - continue; - } - - let webhookId: string | undefined; - if ((path.startsWith(':') || path.includes('/:')) && node.webhookId) { - webhookId = node.webhookId; - } - - String(webhookMethods) - .split(',') - .forEach((httpMethod) => { - if (!httpMethod) return; - returnData.push({ - httpMethod: httpMethod.trim() as IHttpRequestMethods, - node: node.name, - path, - webhookDescription, - workflowId, - workflowExecuteAdditionalData: additionalData, - webhookId, - }); - }); - } - - return returnData; -} - /** * Returns the webhook path - * */ export function getNodeWebhookPath( workflowId: string, @@ -1317,7 +1054,6 @@ export function getNodeWebhookPath( /** * Returns the webhook URL - * */ export function getNodeWebhookUrl( baseUrl: string, @@ -1561,9 +1297,8 @@ export function nodeIssuesToString(issues: INodeIssues, node?: INode): string[] /* * Validates resource locator node parameters based on validation ruled defined in each parameter mode - * */ -export const validateResourceLocatorParameter = ( +const validateResourceLocatorParameter = ( value: INodeParameterResourceLocator, parameterMode: INodePropertyMode, ): string[] => { @@ -1592,9 +1327,8 @@ export const validateResourceLocatorParameter = ( /* * Validates resource mapper values based on service schema - * */ -export const validateResourceMapperParameter = ( +const validateResourceMapperParameter = ( nodeProperties: INodeProperties, value: ResourceMapperValue, skipRequiredCheck = false, @@ -1633,7 +1367,7 @@ export const validateResourceMapperParameter = ( return issues; }; -export const validateParameter = ( +const validateParameter = ( nodeProperties: INodeProperties, value: GenericValue, type: FieldType, @@ -1661,7 +1395,7 @@ export const validateParameter = ( * @param {INodeProperties} nodeProperties The properties of the node * @param {NodeParameterValue} value The value of the parameter */ -export function addToIssuesIfMissing( +function addToIssuesIfMissing( foundIssues: INodeIssues, nodeProperties: INodeProperties, value: NodeParameterValue | INodeParameterResourceLocator, @@ -1936,7 +1670,6 @@ export function mergeIssues(destination: INodeIssues, source: INodeIssues | null /** * Merges the given node properties - * */ export function mergeNodeProperties( mainProperties: INodeProperties[], @@ -1967,19 +1700,3 @@ export function getVersionedNodeType( } return object; } - -export function isSingleExecution(type: string, parameters: INodeParameters): boolean { - const singleExecutionCase = SINGLE_EXECUTION_NODES[type]; - - if (singleExecutionCase) { - for (const parameter of Object.keys(singleExecutionCase)) { - if (!singleExecutionCase[parameter].includes(parameters[parameter] as NodeParameterValue)) { - return false; - } - } - - return true; - } - - return false; -} diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index d9e3dee8da..b63a83105a 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -1,62 +1,36 @@ /* eslint-disable @typescript-eslint/no-use-before-define */ - /* eslint-disable @typescript-eslint/no-unsafe-member-access */ - /* eslint-disable @typescript-eslint/no-unsafe-return */ /* eslint-disable @typescript-eslint/no-for-in-array */ - -/* eslint-disable @typescript-eslint/prefer-nullish-coalescing */ - import { MANUAL_CHAT_TRIGGER_LANGCHAIN_NODE_TYPE, NODES_WITH_RENAMABLE_CONTENT, STARTING_NODE_TYPES, } from './Constants'; -import type { IDeferredPromise } from './DeferredPromise'; import { ApplicationError } from './errors/application.error'; import { Expression } from './Expression'; import { getGlobalState } from './GlobalState'; import type { IConnections, - IExecuteResponsePromiseData, - IGetExecuteTriggerFunctions, INode, - INodeExecuteFunctions, INodeExecutionData, - INodeIssues, INodeParameters, INodes, INodeType, INodeTypes, IPinData, - IPollFunctions, - IRunExecutionData, - ITaskDataConnections, - ITriggerResponse, - IWebhookData, - IWebhookResponseData, - IWorkflowIssues, - IWorkflowExecuteAdditionalData, IWorkflowSettings, - WebhookSetupMethodNames, - WorkflowActivateMode, - WorkflowExecuteMode, IConnection, IConnectedNode, IDataObject, - IExecuteData, INodeConnection, IObservableObject, - IRun, - IRunNodeResponse, NodeParameterValueType, - CloseFunction, INodeOutputConfiguration, } from './Interfaces'; -import { Node, NodeConnectionType } from './Interfaces'; +import { NodeConnectionType } from './Interfaces'; import * as NodeHelpers from './NodeHelpers'; import * as ObservableObject from './ObservableObject'; -import { RoutingNode } from './RoutingNode'; function dedupe(arr: T[]): T[] { return [...new Set(arr)]; @@ -214,112 +188,6 @@ export class Workflow { return returnConnection; } - /** - * A workflow can only be activated if it has a node which has either triggers - * or webhooks defined. - * - * @param {string[]} [ignoreNodeTypes] Node-types to ignore in the check - */ - checkIfWorkflowCanBeActivated(ignoreNodeTypes?: string[]): boolean { - let node: INode; - let nodeType: INodeType | undefined; - - for (const nodeName of Object.keys(this.nodes)) { - node = this.nodes[nodeName]; - - if (node.disabled === true) { - // Deactivated nodes can not trigger a run so ignore - continue; - } - - if (ignoreNodeTypes !== undefined && ignoreNodeTypes.includes(node.type)) { - continue; - } - - nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - - if (nodeType === undefined) { - // Type is not known so check is not possible - continue; - } - - if ( - nodeType.poll !== undefined || - nodeType.trigger !== undefined || - nodeType.webhook !== undefined - ) { - // Is a trigger node. So workflow can be activated. - return true; - } - } - - return false; - } - - /** - * Checks if everything in the workflow is complete - * and ready to be executed. If it returns null everything - * is fine. If there are issues it returns the issues - * which have been found for the different nodes. - * TODO: Does currently not check for credential issues! - */ - checkReadyForExecution( - inputData: { - startNode?: string; - destinationNode?: string; - pinDataNodeNames?: string[]; - } = {}, - ): IWorkflowIssues | null { - const workflowIssues: IWorkflowIssues = {}; - - let checkNodes: string[] = []; - if (inputData.destinationNode) { - // If a destination node is given we have to check all the nodes - // leading up to it - checkNodes = this.getParentNodes(inputData.destinationNode); - checkNodes.push(inputData.destinationNode); - } else if (inputData.startNode) { - // If a start node is given we have to check all nodes which - // come after it - checkNodes = this.getChildNodes(inputData.startNode); - checkNodes.push(inputData.startNode); - } - - for (const nodeName of checkNodes) { - let nodeIssues: INodeIssues | null = null; - const node = this.nodes[nodeName]; - - if (node.disabled === true) { - continue; - } - - const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - - if (nodeType === undefined) { - // Node type is not known - nodeIssues = { - typeUnknown: true, - }; - } else { - nodeIssues = NodeHelpers.getNodeParametersIssues( - nodeType.description.properties, - node, - inputData.pinDataNodeNames, - ); - } - - if (nodeIssues !== null) { - workflowIssues[node.name] = nodeIssues; - } - } - - if (Object.keys(workflowIssues).length === 0) { - return null; - } - - return workflowIssues; - } - /** * Returns the static data of the workflow. * It gets saved with the workflow and will be the same for @@ -1065,437 +933,6 @@ export class Workflow { return this.__getStartNode(Object.keys(this.nodes)); } - - async createWebhookIfNotExists( - webhookData: IWebhookData, - nodeExecuteFunctions: INodeExecuteFunctions, - mode: WorkflowExecuteMode, - activation: WorkflowActivateMode, - ): Promise { - const webhookExists = await this.runWebhookMethod( - 'checkExists', - webhookData, - nodeExecuteFunctions, - mode, - activation, - ); - if (!webhookExists) { - // If webhook does not exist yet create it - await this.runWebhookMethod('create', webhookData, nodeExecuteFunctions, mode, activation); - } - } - - async deleteWebhook( - webhookData: IWebhookData, - nodeExecuteFunctions: INodeExecuteFunctions, - mode: WorkflowExecuteMode, - activation: WorkflowActivateMode, - ) { - await this.runWebhookMethod('delete', webhookData, nodeExecuteFunctions, mode, activation); - } - - private async runWebhookMethod( - method: WebhookSetupMethodNames, - webhookData: IWebhookData, - nodeExecuteFunctions: INodeExecuteFunctions, - mode: WorkflowExecuteMode, - activation: WorkflowActivateMode, - ): Promise { - const node = this.getNode(webhookData.node); - - if (!node) return; - - const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - - const webhookFn = nodeType.webhookMethods?.[webhookData.webhookDescription.name]?.[method]; - if (webhookFn === undefined) return; - - const thisArgs = nodeExecuteFunctions.getExecuteHookFunctions( - this, - node, - webhookData.workflowExecuteAdditionalData, - mode, - activation, - webhookData, - ); - - return await webhookFn.call(thisArgs); - } - - /** - * Runs the given trigger node so that it can trigger the workflow - * when the node has data. - * - */ - async runTrigger( - node: INode, - getTriggerFunctions: IGetExecuteTriggerFunctions, - additionalData: IWorkflowExecuteAdditionalData, - mode: WorkflowExecuteMode, - activation: WorkflowActivateMode, - ): Promise { - const triggerFunctions = getTriggerFunctions(this, node, additionalData, mode, activation); - - const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - - if (nodeType === undefined) { - throw new ApplicationError('Node with unknown node type', { - extra: { nodeName: node.name }, - tags: { nodeType: node.type }, - }); - } - - if (!nodeType.trigger) { - throw new ApplicationError('Node type does not have a trigger function defined', { - extra: { nodeName: node.name }, - tags: { nodeType: node.type }, - }); - } - - if (mode === 'manual') { - // In manual mode we do not just start the trigger function we also - // want to be able to get informed as soon as the first data got emitted - const triggerResponse = await nodeType.trigger.call(triggerFunctions); - - // Add the manual trigger response which resolves when the first time data got emitted - triggerResponse!.manualTriggerResponse = new Promise((resolve, reject) => { - triggerFunctions.emit = ( - (resolveEmit) => - ( - data: INodeExecutionData[][], - responsePromise?: IDeferredPromise, - donePromise?: IDeferredPromise, - ) => { - additionalData.hooks!.hookFunctions.sendResponse = [ - async (response: IExecuteResponsePromiseData): Promise => { - if (responsePromise) { - responsePromise.resolve(response); - } - }, - ]; - - if (donePromise) { - additionalData.hooks!.hookFunctions.workflowExecuteAfter?.unshift( - async (runData: IRun): Promise => { - return donePromise.resolve(runData); - }, - ); - } - - resolveEmit(data); - } - )(resolve); - triggerFunctions.emitError = ( - (rejectEmit) => - (error: Error, responsePromise?: IDeferredPromise) => { - additionalData.hooks!.hookFunctions.sendResponse = [ - async (): Promise => { - if (responsePromise) { - responsePromise.reject(error); - } - }, - ]; - - rejectEmit(error); - } - )(reject); - }); - - return triggerResponse; - } - // In all other modes simply start the trigger - return await nodeType.trigger.call(triggerFunctions); - } - - /** - * Runs the given trigger node so that it can trigger the workflow - * when the node has data. - * - */ - - async runPoll( - node: INode, - pollFunctions: IPollFunctions, - ): Promise { - const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - - if (nodeType === undefined) { - throw new ApplicationError('Node with unknown node type', { - extra: { nodeName: node.name }, - tags: { nodeType: node.type }, - }); - } - - if (!nodeType.poll) { - throw new ApplicationError('Node type does not have a poll function defined', { - extra: { nodeName: node.name }, - tags: { nodeType: node.type }, - }); - } - - return await nodeType.poll.call(pollFunctions); - } - - /** - * Executes the webhook data to see what it should return and if the - * workflow should be started or not - * - */ - async runWebhook( - webhookData: IWebhookData, - node: INode, - additionalData: IWorkflowExecuteAdditionalData, - nodeExecuteFunctions: INodeExecuteFunctions, - mode: WorkflowExecuteMode, - runExecutionData: IRunExecutionData | null, - ): Promise { - const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - if (nodeType === undefined) { - throw new ApplicationError('Unknown node type of webhook node', { - extra: { nodeName: node.name }, - }); - } else if (nodeType.webhook === undefined) { - throw new ApplicationError('Node does not have any webhooks defined', { - extra: { nodeName: node.name }, - }); - } - - const closeFunctions: CloseFunction[] = []; - - const context = nodeExecuteFunctions.getExecuteWebhookFunctions( - this, - node, - additionalData, - mode, - webhookData, - closeFunctions, - runExecutionData, - ); - return nodeType instanceof Node - ? await nodeType.webhook(context) - : await nodeType.webhook.call(context); - } - - /** - * Executes the given node. - * - */ - // eslint-disable-next-line complexity - async runNode( - executionData: IExecuteData, - runExecutionData: IRunExecutionData, - runIndex: number, - additionalData: IWorkflowExecuteAdditionalData, - nodeExecuteFunctions: INodeExecuteFunctions, - mode: WorkflowExecuteMode, - abortSignal?: AbortSignal, - ): Promise { - const { node } = executionData; - let inputData = executionData.data; - - if (node.disabled === true) { - // If node is disabled simply pass the data through - // return NodeRunHelpers. - if (inputData.hasOwnProperty('main') && inputData.main.length > 0) { - // If the node is disabled simply return the data from the first main input - if (inputData.main[0] === null) { - return { data: undefined }; - } - return { data: [inputData.main[0]] }; - } - return { data: undefined }; - } - - const nodeType = this.nodeTypes.getByNameAndVersion(node.type, node.typeVersion); - if (nodeType === undefined) { - throw new ApplicationError('Node type is unknown so cannot run it', { - tags: { nodeType: node.type }, - }); - } - - let connectionInputData: INodeExecutionData[] = []; - if (nodeType.execute || (!nodeType.poll && !nodeType.trigger && !nodeType.webhook)) { - // Only stop if first input is empty for execute runs. For all others run anyways - // because then it is a trigger node. As they only pass data through and so the input-data - // becomes output-data it has to be possible. - - if (inputData.main?.length > 0) { - // We always use the data of main input and the first input for execute - connectionInputData = inputData.main[0] as INodeExecutionData[]; - } - - const forceInputNodeExecution = this.settings.executionOrder !== 'v1'; - if (!forceInputNodeExecution) { - // If the nodes do not get force executed data of some inputs may be missing - // for that reason do we use the data of the first one that contains any - for (const mainData of inputData.main) { - if (mainData?.length) { - connectionInputData = mainData; - break; - } - } - } - - if (connectionInputData.length === 0) { - // No data for node so return - return { data: undefined }; - } - } - - if ( - runExecutionData.resultData.lastNodeExecuted === node.name && - runExecutionData.resultData.error !== undefined - ) { - // The node did already fail. So throw an error here that it displays and logs it correctly. - // Does get used by webhook and trigger nodes in case they throw an error that it is possible - // to log the error and display in Editor-UI. - if ( - runExecutionData.resultData.error.name === 'NodeOperationError' || - runExecutionData.resultData.error.name === 'NodeApiError' - ) { - throw runExecutionData.resultData.error; - } - - const error = new Error(runExecutionData.resultData.error.message); - error.stack = runExecutionData.resultData.error.stack; - throw error; - } - - if (node.executeOnce === true) { - // If node should be executed only once so use only the first input item - const newInputData: ITaskDataConnections = {}; - for (const connectionType of Object.keys(inputData)) { - newInputData[connectionType] = inputData[connectionType].map((input) => { - // eslint-disable-next-line @typescript-eslint/prefer-optional-chain - return input && input.slice(0, 1); - }); - } - inputData = newInputData; - } - - if (nodeType.execute) { - const closeFunctions: CloseFunction[] = []; - const context = nodeExecuteFunctions.getExecuteFunctions( - this, - runExecutionData, - runIndex, - connectionInputData, - inputData, - node, - additionalData, - executionData, - mode, - closeFunctions, - abortSignal, - ); - const data = - nodeType instanceof Node - ? await nodeType.execute(context) - : await nodeType.execute.call(context); - - const closeFunctionsResults = await Promise.allSettled( - closeFunctions.map(async (fn) => await fn()), - ); - - const closingErrors = closeFunctionsResults - .filter((result): result is PromiseRejectedResult => result.status === 'rejected') - .map((result) => result.reason); - - if (closingErrors.length > 0) { - if (closingErrors[0] instanceof Error) throw closingErrors[0]; - throw new ApplicationError("Error on execution node's close function(s)", { - extra: { nodeName: node.name }, - tags: { nodeType: node.type }, - cause: closingErrors, - }); - } - - return { data }; - } else if (nodeType.poll) { - if (mode === 'manual') { - // In manual mode run the poll function - const thisArgs = nodeExecuteFunctions.getExecutePollFunctions( - this, - node, - additionalData, - mode, - 'manual', - ); - return { data: await nodeType.poll.call(thisArgs) }; - } - // In any other mode pass data through as it already contains the result of the poll - return { data: inputData.main as INodeExecutionData[][] }; - } else if (nodeType.trigger) { - if (mode === 'manual') { - // In manual mode start the trigger - const triggerResponse = await this.runTrigger( - node, - nodeExecuteFunctions.getExecuteTriggerFunctions, - additionalData, - mode, - 'manual', - ); - - if (triggerResponse === undefined) { - return { data: null }; - } - - let closeFunction; - if (triggerResponse.closeFunction) { - // In manual mode we return the trigger closeFunction. That allows it to be called directly - // but we do not have to wait for it to finish. That is important for things like queue-nodes. - // There the full close will may be delayed till a message gets acknowledged after the execution. - // If we would not be able to wait for it to close would it cause problems with "own" mode as the - // process would be killed directly after it and so the acknowledge would not have been finished yet. - closeFunction = triggerResponse.closeFunction; - - // Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers - abortSignal?.addEventListener('abort', closeFunction); - } - - if (triggerResponse.manualTriggerFunction !== undefined) { - // If a manual trigger function is defined call it and wait till it did run - await triggerResponse.manualTriggerFunction(); - } - - const response = await triggerResponse.manualTriggerResponse!; - - if (response.length === 0) { - return { data: null, closeFunction }; - } - - return { data: response, closeFunction }; - } - // For trigger nodes in any mode except "manual" do we simply pass the data through - return { data: inputData.main as INodeExecutionData[][] }; - } else if (nodeType.webhook) { - // For webhook nodes always simply pass the data through - return { data: inputData.main as INodeExecutionData[][] }; - } else { - // For nodes which have routing information on properties - - const routingNode = new RoutingNode( - this, - node, - connectionInputData, - runExecutionData ?? null, - additionalData, - mode, - ); - - return { - data: await routingNode.runNode( - inputData, - runIndex, - nodeType, - executionData, - nodeExecuteFunctions, - undefined, - abortSignal, - ), - }; - } - } } function hasDotNotationBannedChar(nodeName: string) { diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 6a7630722c..992a27921f 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -14,7 +14,6 @@ export * from './MessageEventBus'; export * from './ExecutionStatus'; export * from './Expression'; export * from './NodeHelpers'; -export * from './RoutingNode'; export * from './Workflow'; export * from './WorkflowDataProxy'; export * from './WorkflowDataProxyEnvProvider'; diff --git a/packages/workflow/test/Helpers.ts b/packages/workflow/test/Helpers.ts index f72c041c69..c8b1c200e3 100644 --- a/packages/workflow/test/Helpers.ts +++ b/packages/workflow/test/Helpers.ts @@ -1,70 +1,16 @@ import { readFileSync } from 'fs'; -import { mock } from 'jest-mock-extended'; -import get from 'lodash/get'; import path from 'path'; -import type { - IExecuteSingleFunctions, - IHttpRequestOptions, - IN8nHttpFullResponse, - IN8nHttpResponse, - INode, - INodeTypes, - IRunExecutionData, -} from '@/Interfaces'; -import type { Workflow } from '@/Workflow'; +import type { INodeTypes } from '@/Interfaces'; import { NodeTypes as NodeTypesClass } from './NodeTypes'; -export function getExecuteSingleFunctions( - workflow: Workflow, - runExecutionData: IRunExecutionData, - runIndex: number, - node: INode, - itemIndex: number, -): IExecuteSingleFunctions { - return mock({ - getItemIndex: () => itemIndex, - getNodeParameter: (parameterName: string) => { - return workflow.expression.getParameterValue( - get(node.parameters, parameterName), - runExecutionData, - runIndex, - itemIndex, - node.name, - [], - 'internal', - {}, - ); - }, - getWorkflow: () => ({ - id: workflow.id, - name: workflow.name, - active: workflow.active, - }), - helpers: mock({ - async httpRequest( - requestOptions: IHttpRequestOptions, - ): Promise { - return { - body: { - headers: {}, - statusCode: 200, - requestOptions, - }, - }; - }, - }), - }); -} - let nodeTypesInstance: NodeTypesClass | undefined; export function NodeTypes(): INodeTypes { if (nodeTypesInstance === undefined) { nodeTypesInstance = new NodeTypesClass(); } - return nodeTypesInstance; } diff --git a/packages/workflow/test/NodeHelpers.test.ts b/packages/workflow/test/NodeHelpers.test.ts index 09e036fc94..e56a4e20ac 100644 --- a/packages/workflow/test/NodeHelpers.test.ts +++ b/packages/workflow/test/NodeHelpers.test.ts @@ -9,10 +9,8 @@ import { import { getNodeParameters, getNodeHints, - isSingleExecution, isSubNodeType, applyDeclarativeNodeOptionParameters, - convertNodeToAiTool, } from '@/NodeHelpers'; import type { Workflow } from '@/Workflow'; @@ -3542,34 +3540,6 @@ describe('NodeHelpers', () => { }); }); - describe('isSingleExecution', () => { - test('should determine based on node parameters if it would be executed once', () => { - expect(isSingleExecution('n8n-nodes-base.code', {})).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.code', { mode: 'runOnceForEachItem' })).toEqual( - false, - ); - expect(isSingleExecution('n8n-nodes-base.executeWorkflow', {})).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.executeWorkflow', { mode: 'each' })).toEqual(false); - expect(isSingleExecution('n8n-nodes-base.crateDb', {})).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.crateDb', { operation: 'update' })).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.timescaleDb', {})).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.timescaleDb', { operation: 'update' })).toEqual( - true, - ); - expect(isSingleExecution('n8n-nodes-base.microsoftSql', {})).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.microsoftSql', { operation: 'update' })).toEqual( - true, - ); - expect(isSingleExecution('n8n-nodes-base.microsoftSql', { operation: 'delete' })).toEqual( - true, - ); - expect(isSingleExecution('n8n-nodes-base.questDb', {})).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.mongoDb', { operation: 'insert' })).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.mongoDb', { operation: 'update' })).toEqual(true); - expect(isSingleExecution('n8n-nodes-base.redis', {})).toEqual(true); - }); - }); - describe('isSubNodeType', () => { const tests: Array<[boolean, Pick | null]> = [ [false, null], @@ -3637,177 +3607,4 @@ describe('NodeHelpers', () => { expect(nodeType.description.properties).toEqual([]); }); }); - - describe('convertNodeToAiTool', () => { - let fullNodeWrapper: { description: INodeTypeDescription }; - - beforeEach(() => { - fullNodeWrapper = { - description: { - displayName: 'Test Node', - name: 'testNode', - group: ['test'], - description: 'A test node', - version: 1, - defaults: {}, - inputs: [NodeConnectionType.Main], - outputs: [NodeConnectionType.Main], - properties: [], - }, - }; - }); - - it('should modify the name and displayName correctly', () => { - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.name).toBe('testNodeTool'); - expect(result.description.displayName).toBe('Test Node Tool'); - }); - - it('should update inputs and outputs', () => { - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.inputs).toEqual([]); - expect(result.description.outputs).toEqual([NodeConnectionType.AiTool]); - }); - - it('should remove the usableAsTool property', () => { - fullNodeWrapper.description.usableAsTool = true; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.usableAsTool).toBeUndefined(); - }); - - it("should add toolDescription property if it doesn't exist", () => { - const result = convertNodeToAiTool(fullNodeWrapper); - const toolDescriptionProp = result.description.properties.find( - (prop) => prop.name === 'toolDescription', - ); - expect(toolDescriptionProp).toBeDefined(); - expect(toolDescriptionProp?.type).toBe('string'); - expect(toolDescriptionProp?.default).toBe(fullNodeWrapper.description.description); - }); - - it('should set codex categories correctly', () => { - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.codex).toEqual({ - categories: ['AI'], - subcategories: { - AI: ['Tools'], - Tools: ['Other Tools'], - }, - resources: {}, - }); - }); - - it('should preserve existing properties', () => { - const existingProp: INodeProperties = { - displayName: 'Existing Prop', - name: 'existingProp', - type: 'string', - default: 'test', - }; - fullNodeWrapper.description.properties = [existingProp]; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.properties).toHaveLength(3); // Existing prop + toolDescription + notice - expect(result.description.properties).toContainEqual(existingProp); - }); - - it('should handle nodes with resource property', () => { - const resourceProp: INodeProperties = { - displayName: 'Resource', - name: 'resource', - type: 'options', - options: [{ name: 'User', value: 'user' }], - default: 'user', - }; - fullNodeWrapper.description.properties = [resourceProp]; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.properties[1].name).toBe('descriptionType'); - expect(result.description.properties[2].name).toBe('toolDescription'); - expect(result.description.properties[3]).toEqual(resourceProp); - }); - - it('should handle nodes with operation property', () => { - const operationProp: INodeProperties = { - displayName: 'Operation', - name: 'operation', - type: 'options', - options: [{ name: 'Create', value: 'create' }], - default: 'create', - }; - fullNodeWrapper.description.properties = [operationProp]; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.properties[1].name).toBe('descriptionType'); - expect(result.description.properties[2].name).toBe('toolDescription'); - expect(result.description.properties[3]).toEqual(operationProp); - }); - - it('should handle nodes with both resource and operation properties', () => { - const resourceProp: INodeProperties = { - displayName: 'Resource', - name: 'resource', - type: 'options', - options: [{ name: 'User', value: 'user' }], - default: 'user', - }; - const operationProp: INodeProperties = { - displayName: 'Operation', - name: 'operation', - type: 'options', - options: [{ name: 'Create', value: 'create' }], - default: 'create', - }; - fullNodeWrapper.description.properties = [resourceProp, operationProp]; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.properties[1].name).toBe('descriptionType'); - expect(result.description.properties[2].name).toBe('toolDescription'); - expect(result.description.properties[3]).toEqual(resourceProp); - expect(result.description.properties[4]).toEqual(operationProp); - }); - - it('should handle nodes with empty properties', () => { - fullNodeWrapper.description.properties = []; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.properties).toHaveLength(2); - expect(result.description.properties[1].name).toBe('toolDescription'); - }); - - it('should handle nodes with existing codex property', () => { - fullNodeWrapper.description.codex = { - categories: ['Existing'], - subcategories: { - Existing: ['Category'], - }, - resources: { - primaryDocumentation: [{ url: 'https://example.com' }], - }, - }; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.codex).toEqual({ - categories: ['AI'], - subcategories: { - AI: ['Tools'], - Tools: ['Other Tools'], - }, - resources: { - primaryDocumentation: [{ url: 'https://example.com' }], - }, - }); - }); - - it('should handle nodes with very long names', () => { - fullNodeWrapper.description.name = 'veryLongNodeNameThatExceedsNormalLimits'.repeat(10); - fullNodeWrapper.description.displayName = - 'Very Long Node Name That Exceeds Normal Limits'.repeat(10); - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.name.endsWith('Tool')).toBe(true); - expect(result.description.displayName.endsWith('Tool')).toBe(true); - }); - - it('should handle nodes with special characters in name and displayName', () => { - fullNodeWrapper.description.name = 'special@#$%Node'; - fullNodeWrapper.description.displayName = 'Special @#$% Node'; - const result = convertNodeToAiTool(fullNodeWrapper); - expect(result.description.name).toBe('special@#$%NodeTool'); - expect(result.description.displayName).toBe('Special @#$% Node Tool'); - }); - }); }); diff --git a/packages/workflow/test/Workflow.test.ts b/packages/workflow/test/Workflow.test.ts index ded592c9a6..88c41e8d9f 100644 --- a/packages/workflow/test/Workflow.test.ts +++ b/packages/workflow/test/Workflow.test.ts @@ -6,22 +6,14 @@ import type { IBinaryKeyData, IConnections, IDataObject, - IExecuteData, INode, - INodeExecuteFunctions, INodeExecutionData, INodeParameters, - INodeType, - INodeTypeDescription, INodeTypes, IRunExecutionData, - ITriggerFunctions, - ITriggerResponse, - IWorkflowExecuteAdditionalData, NodeParameterValueType, } from '@/Interfaces'; -import * as NodeHelpers from '@/NodeHelpers'; -import { Workflow, type WorkflowParameters } from '@/Workflow'; +import { Workflow } from '@/Workflow'; process.env.TEST_VARIABLE_1 = 'valueEnvVariable1'; @@ -33,126 +25,6 @@ interface StubNode { } describe('Workflow', () => { - describe('checkIfWorkflowCanBeActivated', () => { - const disabledNode = mock({ type: 'triggerNode', disabled: true }); - const unknownNode = mock({ type: 'unknownNode' }); - const noTriggersNode = mock({ type: 'noTriggersNode' }); - const pollNode = mock({ type: 'pollNode' }); - const triggerNode = mock({ type: 'triggerNode' }); - const webhookNode = mock({ type: 'webhookNode' }); - - const nodeTypes = mock(); - nodeTypes.getByNameAndVersion.mockImplementation((type) => { - // TODO: getByNameAndVersion signature needs to be updated to allow returning undefined - if (type === 'unknownNode') return undefined as unknown as INodeType; - const partial: Partial = { - poll: undefined, - trigger: undefined, - webhook: undefined, - description: mock({ - properties: [], - }), - }; - if (type === 'pollNode') partial.poll = jest.fn(); - if (type === 'triggerNode') partial.trigger = jest.fn(); - if (type === 'webhookNode') partial.webhook = jest.fn(); - return mock(partial); - }); - - test.each([ - ['should skip disabled nodes', disabledNode, [], false], - ['should skip nodes marked as ignored', triggerNode, ['triggerNode'], false], - ['should skip unknown nodes', unknownNode, [], false], - ['should skip nodes with no trigger method', noTriggersNode, [], false], - ['should activate if poll method exists', pollNode, [], true], - ['should activate if trigger method exists', triggerNode, [], true], - ['should activate if webhook method exists', webhookNode, [], true], - ])('%s', async (_, node, ignoredNodes, expected) => { - const params = mock({ nodeTypes }); - params.nodes = [node]; - const workflow = new Workflow(params); - expect(workflow.checkIfWorkflowCanBeActivated(ignoredNodes)).toBe(expected); - }); - }); - - describe('checkReadyForExecution', () => { - const disabledNode = mock({ name: 'Disabled Node', disabled: true }); - const startNode = mock({ name: 'Start Node' }); - const unknownNode = mock({ name: 'Unknown Node', type: 'unknownNode' }); - - const nodeParamIssuesSpy = jest.spyOn(NodeHelpers, 'getNodeParametersIssues'); - - const nodeTypes = mock(); - nodeTypes.getByNameAndVersion.mockImplementation((type) => { - // TODO: getByNameAndVersion signature needs to be updated to allow returning undefined - if (type === 'unknownNode') return undefined as unknown as INodeType; - return mock({ - description: { - properties: [], - }, - }); - }); - - beforeEach(() => jest.clearAllMocks()); - - it('should return null if there are no nodes', () => { - const workflow = new Workflow({ - nodes: [], - connections: {}, - active: false, - nodeTypes, - }); - - const issues = workflow.checkReadyForExecution(); - expect(issues).toBe(null); - expect(nodeTypes.getByNameAndVersion).not.toHaveBeenCalled(); - expect(nodeParamIssuesSpy).not.toHaveBeenCalled(); - }); - - it('should return null if there are no enabled nodes', () => { - const workflow = new Workflow({ - nodes: [disabledNode], - connections: {}, - active: false, - nodeTypes, - }); - - const issues = workflow.checkReadyForExecution({ startNode: disabledNode.name }); - expect(issues).toBe(null); - expect(nodeTypes.getByNameAndVersion).toHaveBeenCalledTimes(1); - expect(nodeParamIssuesSpy).not.toHaveBeenCalled(); - }); - - it('should return typeUnknown for unknown nodes', () => { - const workflow = new Workflow({ - nodes: [unknownNode], - connections: {}, - active: false, - nodeTypes, - }); - - const issues = workflow.checkReadyForExecution({ startNode: unknownNode.name }); - expect(issues).toEqual({ [unknownNode.name]: { typeUnknown: true } }); - expect(nodeTypes.getByNameAndVersion).toHaveBeenCalledTimes(2); - expect(nodeParamIssuesSpy).not.toHaveBeenCalled(); - }); - - it('should return issues for regular nodes', () => { - const workflow = new Workflow({ - nodes: [startNode], - connections: {}, - active: false, - nodeTypes, - }); - nodeParamIssuesSpy.mockReturnValue({ execution: false }); - - const issues = workflow.checkReadyForExecution({ startNode: startNode.name }); - expect(issues).toEqual({ [startNode.name]: { execution: false } }); - expect(nodeTypes.getByNameAndVersion).toHaveBeenCalledTimes(2); - expect(nodeParamIssuesSpy).toHaveBeenCalled(); - }); - }); - describe('renameNodeInParameterValue', () => { describe('for expressions', () => { const tests = [ @@ -2023,69 +1895,6 @@ describe('Workflow', () => { }); }); - describe('runNode', () => { - const nodeTypes = mock(); - const triggerNode = mock(); - const triggerResponse = mock({ - closeFunction: jest.fn(), - // This node should never trigger, or return - manualTriggerFunction: async () => await new Promise(() => {}), - }); - const triggerNodeType = mock({ - description: { - properties: [], - }, - execute: undefined, - poll: undefined, - webhook: undefined, - async trigger(this: ITriggerFunctions) { - return triggerResponse; - }, - }); - - nodeTypes.getByNameAndVersion.mockReturnValue(triggerNodeType); - - const workflow = new Workflow({ - nodeTypes, - nodes: [triggerNode], - connections: {}, - active: false, - }); - - const executionData = mock(); - const runExecutionData = mock(); - const additionalData = mock(); - const nodeExecuteFunctions = mock(); - const triggerFunctions = mock(); - nodeExecuteFunctions.getExecuteTriggerFunctions.mockReturnValue(triggerFunctions); - const abortController = new AbortController(); - - test('should call closeFunction when manual trigger is aborted', async () => { - const runPromise = workflow.runNode( - executionData, - runExecutionData, - 0, - additionalData, - nodeExecuteFunctions, - 'manual', - abortController.signal, - ); - // Yield back to the event-loop to let async parts of `runNode` execute - await new Promise((resolve) => setImmediate(resolve)); - - let isSettled = false; - void runPromise.then(() => { - isSettled = true; - }); - expect(isSettled).toBe(false); - expect(abortController.signal.aborted).toBe(false); - expect(triggerResponse.closeFunction).not.toHaveBeenCalled(); - - abortController.abort(); - expect(triggerResponse.closeFunction).toHaveBeenCalled(); - }); - }); - describe('__getConnectionsByDestination', () => { it('should return empty object when there are no connections', () => { const workflow = new Workflow({