From cade9b2d91dbba740264eca04f6e99869eff429d Mon Sep 17 00:00:00 2001 From: Val <68596159+valya@users.noreply.github.com> Date: Tue, 22 Oct 2024 15:19:32 +0100 Subject: [PATCH] feat: Add proper INodeTypes polyfill for Task Runner (no-changelog) (#11333) --- .../src/__tests__/node-types.test.ts | 66 +++++++++++++++++++ .../src/js-task-runner/js-task-runner.ts | 13 +--- packages/@n8n/task-runner/src/node-types.ts | 64 ++++++++++++++++++ packages/@n8n/task-runner/src/runner-types.ts | 10 ++- packages/@n8n/task-runner/src/task-runner.ts | 13 +++- .../src/runners/__tests__/task-broker.test.ts | 17 ++++- packages/cli/src/runners/runner-types.ts | 10 ++- packages/cli/src/runners/runner-ws-server.ts | 2 - .../cli/src/runners/task-broker.service.ts | 28 +++++++- 9 files changed, 202 insertions(+), 21 deletions(-) create mode 100644 packages/@n8n/task-runner/src/__tests__/node-types.test.ts create mode 100644 packages/@n8n/task-runner/src/node-types.ts diff --git a/packages/@n8n/task-runner/src/__tests__/node-types.test.ts b/packages/@n8n/task-runner/src/__tests__/node-types.test.ts new file mode 100644 index 0000000000..c102c80df3 --- /dev/null +++ b/packages/@n8n/task-runner/src/__tests__/node-types.test.ts @@ -0,0 +1,66 @@ +import type { INodeTypeDescription } from 'n8n-workflow'; + +import { TaskRunnerNodeTypes } from '../node-types'; + +const SINGLE_VERSIONED = { name: 'single-versioned', version: 1 }; + +const SINGLE_UNVERSIONED = { name: 'single-unversioned' }; + +const MULTI_VERSIONED = { name: 'multi-versioned', version: [1, 2] }; + +const SPLIT_VERSIONED = [ + { name: 'split-versioned', version: 1 }, + { name: 'split-versioned', version: 2 }, +]; + +const TYPES: INodeTypeDescription[] = [ + SINGLE_VERSIONED, + SINGLE_UNVERSIONED, + MULTI_VERSIONED, + ...SPLIT_VERSIONED, +] as INodeTypeDescription[]; + +describe('TaskRunnerNodeTypes', () => { + describe('getByNameAndVersion', () => { + let nodeTypes: TaskRunnerNodeTypes; + + beforeEach(() => { + nodeTypes = new TaskRunnerNodeTypes(TYPES); + }); + + it('should return undefined if not found', () => { + expect(nodeTypes.getByNameAndVersion('unknown', 1)).toBeUndefined(); + }); + + it('should return highest versioned node type if no version is given', () => { + expect(nodeTypes.getByNameAndVersion('split-versioned')).toEqual({ + description: SPLIT_VERSIONED[1], + }); + }); + + it('should return specified version for split version', () => { + expect(nodeTypes.getByNameAndVersion('split-versioned', 1)).toEqual({ + description: SPLIT_VERSIONED[0], + }); + }); + + it('should return undefined on unknown version', () => { + expect(nodeTypes.getByNameAndVersion('split-versioned', 3)).toBeUndefined(); + }); + + it('should return specified version for multi version', () => { + expect(nodeTypes.getByNameAndVersion('multi-versioned', 1)).toEqual({ + description: MULTI_VERSIONED, + }); + expect(nodeTypes.getByNameAndVersion('multi-versioned', 2)).toEqual({ + description: MULTI_VERSIONED, + }); + }); + + it('should default to DEFAULT_NODETYPE_VERSION if no version specified', () => { + expect(nodeTypes.getByNameAndVersion('single-unversioned', 1)).toEqual({ + description: SINGLE_UNVERSIONED, + }); + }); + }); +}); diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index cf53ba425c..8693fcc4cd 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -7,7 +7,6 @@ import { import type { CodeExecutionMode, INode, - INodeType, ITaskDataConnections, IWorkflowExecuteAdditionalData, WorkflowParameters, @@ -129,17 +128,7 @@ export class JsTaskRunner extends TaskRunner { const workflowParams = allData.workflow; const workflow = new Workflow({ ...workflowParams, - nodeTypes: { - getByNameAndVersion() { - return undefined as unknown as INodeType; - }, - getByName() { - return undefined as unknown as INodeType; - }, - getKnownTypes() { - return {}; - }, - }, + nodeTypes: this.nodeTypes, }); const customConsole = { diff --git a/packages/@n8n/task-runner/src/node-types.ts b/packages/@n8n/task-runner/src/node-types.ts new file mode 100644 index 0000000000..046321bff9 --- /dev/null +++ b/packages/@n8n/task-runner/src/node-types.ts @@ -0,0 +1,64 @@ +import { + ApplicationError, + type IDataObject, + type INodeType, + type INodeTypeDescription, + type INodeTypes, + type IVersionedNodeType, +} from 'n8n-workflow'; + +type VersionedTypes = Map; + +export const DEFAULT_NODETYPE_VERSION = 1; + +export class TaskRunnerNodeTypes implements INodeTypes { + private nodeTypesByVersion: Map; + + constructor(nodeTypes: INodeTypeDescription[]) { + this.nodeTypesByVersion = this.parseNodeTypes(nodeTypes); + } + + private parseNodeTypes(nodeTypes: INodeTypeDescription[]): Map { + const versionedTypes = new Map(); + + for (const nt of nodeTypes) { + const versions = Array.isArray(nt.version) + ? nt.version + : [nt.version ?? DEFAULT_NODETYPE_VERSION]; + + const versioned: VersionedTypes = + versionedTypes.get(nt.name) ?? new Map(); + for (const version of versions) { + versioned.set(version, { ...versioned.get(version), ...nt }); + } + + versionedTypes.set(nt.name, versioned); + } + + return versionedTypes; + } + + // This isn't used in Workflow from what I can see + getByName(_nodeType: string): INodeType | IVersionedNodeType { + throw new ApplicationError('Unimplemented `getByName`', { level: 'error' }); + } + + getByNameAndVersion(nodeType: string, version?: number): INodeType { + const versions = this.nodeTypesByVersion.get(nodeType); + if (!versions) { + return undefined as unknown as INodeType; + } + const nodeVersion = versions.get(version ?? Math.max(...versions.keys())); + if (!nodeVersion) { + return undefined as unknown as INodeType; + } + return { + description: nodeVersion, + }; + } + + // This isn't used in Workflow from what I can see + getKnownTypes(): IDataObject { + throw new ApplicationError('Unimplemented `getKnownTypes`', { level: 'error' }); + } +} diff --git a/packages/@n8n/task-runner/src/runner-types.ts b/packages/@n8n/task-runner/src/runner-types.ts index 27b4e9a76c..1e84843653 100644 --- a/packages/@n8n/task-runner/src/runner-types.ts +++ b/packages/@n8n/task-runner/src/runner-types.ts @@ -1,4 +1,4 @@ -import type { INodeExecutionData } from 'n8n-workflow'; +import type { INodeExecutionData, INodeTypeBaseDescription } from 'n8n-workflow'; export type DataRequestType = 'input' | 'node' | 'all'; @@ -50,6 +50,11 @@ export namespace N8nMessage { data: unknown; } + export interface NodeTypes { + type: 'broker:nodetypes'; + nodeTypes: INodeTypeBaseDescription[]; + } + export type All = | InfoRequest | TaskOfferAccept @@ -57,7 +62,8 @@ export namespace N8nMessage { | TaskSettings | RunnerRegistered | RPCResponse - | TaskDataResponse; + | TaskDataResponse + | NodeTypes; } export namespace ToRequester { diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index ac8378636a..952ffcf571 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -1,8 +1,9 @@ -import { ApplicationError } from 'n8n-workflow'; +import { ApplicationError, type INodeTypeDescription } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { URL } from 'node:url'; import { type MessageEvent, WebSocket } from 'ws'; +import { TaskRunnerNodeTypes } from './node-types'; import { RPC_ALLOW_LIST, type RunnerMessage, @@ -58,6 +59,8 @@ export abstract class TaskRunner { rpcCalls: Map = new Map(); + nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]); + constructor( public taskType: string, wsUrl: string, @@ -158,9 +161,17 @@ export abstract class TaskRunner { break; case 'broker:rpcresponse': this.handleRpcResponse(message.callId, message.status, message.data); + break; + case 'broker:nodetypes': + this.setNodeTypes(message.nodeTypes as unknown as INodeTypeDescription[]); + break; } } + setNodeTypes(nodeTypes: INodeTypeDescription[]) { + this.nodeTypes = new TaskRunnerNodeTypes(nodeTypes); + } + processDataResponse(requestId: string, data: unknown) { const request = this.dataRequests.get(requestId); if (!request) { diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index 5d627ba341..4a226c5b98 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -11,7 +11,7 @@ describe('TaskBroker', () => { let taskBroker: TaskBroker; beforeEach(() => { - taskBroker = new TaskBroker(mock()); + taskBroker = new TaskBroker(mock(), mock()); jest.restoreAllMocks(); }); @@ -69,6 +69,21 @@ describe('TaskBroker', () => { expect(knownRunners.get(runnerId)?.runner).toEqual(runner); expect(knownRunners.get(runnerId)?.messageCallback).toEqual(messageCallback); }); + + it('should send node types to runner', () => { + const runnerId = 'runner1'; + const runner = mock({ id: runnerId }); + const messageCallback = jest.fn(); + + taskBroker.registerRunner(runner, messageCallback); + + expect(messageCallback).toBeCalledWith({ + type: 'broker:nodetypes', + // We're mocking the node types service, so this will + // be undefined. + nodeType: undefined, + }); + }); }); describe('registerRequester', () => { diff --git a/packages/cli/src/runners/runner-types.ts b/packages/cli/src/runners/runner-types.ts index f615754e02..8b205de801 100644 --- a/packages/cli/src/runners/runner-types.ts +++ b/packages/cli/src/runners/runner-types.ts @@ -1,5 +1,5 @@ import type { Response } from 'express'; -import type { INodeExecutionData } from 'n8n-workflow'; +import type { INodeExecutionData, INodeTypeBaseDescription } from 'n8n-workflow'; import type WebSocket from 'ws'; import type { TaskRunner } from './task-broker.service'; @@ -62,6 +62,11 @@ export namespace N8nMessage { data: unknown; } + export interface NodeTypes { + type: 'broker:nodetypes'; + nodeTypes: INodeTypeBaseDescription[]; + } + export type All = | InfoRequest | TaskOfferAccept @@ -69,7 +74,8 @@ export namespace N8nMessage { | TaskSettings | RunnerRegistered | RPCResponse - | TaskDataResponse; + | TaskDataResponse + | NodeTypes; } export namespace ToRequester { diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index ef9e52f5f5..4e19299cee 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -87,8 +87,6 @@ export class TaskRunnerService { this.sendMessage.bind(this, id) as MessageCallback, ); - this.sendMessage(id, { type: 'broker:runnerregistered' }); - this.logger.info(`Runner "${message.name}"(${id}) has been registered`); return; } diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index a63cbbda21..40ad6d6e90 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -2,6 +2,7 @@ import { ApplicationError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { Service } from 'typedi'; +import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { Logger } from '@/logging/logger.service'; import { TaskRejectError } from './errors'; @@ -71,7 +72,19 @@ export class TaskBroker { private pendingTaskRequests: TaskRequest[] = []; - constructor(private readonly logger: Logger) {} + constructor( + private readonly logger: Logger, + private readonly loadNodesAndCredentials: LoadNodesAndCredentials, + ) { + this.loadNodesAndCredentials.addPostProcessor(this.updateNodeTypes); + } + + updateNodeTypes = async () => { + await this.messageAllRunners({ + type: 'broker:nodetypes', + nodeTypes: this.loadNodesAndCredentials.types.nodes, + }); + }; expireTasks() { const now = process.hrtime.bigint(); @@ -84,6 +97,11 @@ export class TaskBroker { registerRunner(runner: TaskRunner, messageCallback: MessageCallback) { this.knownRunners.set(runner.id, { runner, messageCallback }); + void this.knownRunners.get(runner.id)!.messageCallback({ type: 'broker:runnerregistered' }); + void this.knownRunners.get(runner.id)!.messageCallback({ + type: 'broker:nodetypes', + nodeTypes: this.loadNodesAndCredentials.types.nodes, + }); } deregisterRunner(runnerId: string) { @@ -117,6 +135,14 @@ export class TaskBroker { await this.knownRunners.get(runnerId)?.messageCallback(message); } + private async messageAllRunners(message: N8nMessage.ToRunner.All) { + await Promise.allSettled( + [...this.knownRunners.values()].map(async (runner) => { + await runner.messageCallback(message); + }), + ); + } + private async messageRequester(requesterId: string, message: N8nMessage.ToRequester.All) { await this.requesters.get(requesterId)?.(message); }