diff --git a/packages/@n8n/task-runner/src/__tests__/node-types.test.ts b/packages/@n8n/task-runner/src/__tests__/node-types.test.ts index c102c80df3..c535bb0147 100644 --- a/packages/@n8n/task-runner/src/__tests__/node-types.test.ts +++ b/packages/@n8n/task-runner/src/__tests__/node-types.test.ts @@ -63,4 +63,35 @@ describe('TaskRunnerNodeTypes', () => { }); }); }); + + describe('addNodeTypeDescriptions', () => { + it('should add new node types', () => { + const nodeTypes = new TaskRunnerNodeTypes(TYPES); + + const nodeTypeDescriptions = [ + { name: 'new-type', version: 1 }, + { name: 'new-type', version: 2 }, + ] as INodeTypeDescription[]; + + nodeTypes.addNodeTypeDescriptions(nodeTypeDescriptions); + + expect(nodeTypes.getByNameAndVersion('new-type', 1)).toEqual({ + description: { name: 'new-type', version: 1 }, + }); + expect(nodeTypes.getByNameAndVersion('new-type', 2)).toEqual({ + description: { name: 'new-type', version: 2 }, + }); + }); + }); + + describe('onlyUnknown', () => { + it('should return only unknown node types', () => { + const nodeTypes = new TaskRunnerNodeTypes(TYPES); + + const candidate = { name: 'unknown', version: 1 }; + + expect(nodeTypes.onlyUnknown([candidate])).toEqual([candidate]); + expect(nodeTypes.onlyUnknown([SINGLE_VERSIONED])).toEqual([]); + }); + }); }); diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index e0bebe0521..7e79bba73b 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -17,6 +17,7 @@ import type { IRunExecutionData, WorkflowExecuteMode, EnvProviderState, + INodeTypeDescription, } from 'n8n-workflow'; import * as a from 'node:assert'; import { runInNewContext, type Context } from 'node:vm'; @@ -119,6 +120,29 @@ export class JsTaskRunner extends TaskRunner { neededBuiltIns.toDataRequestParams(), ); + /** + * We request node types only when we know a task needs all nodes, because + * needing all nodes means that the task relies on paired item functionality, + * which is the same requirement for needing node types. + */ + if (neededBuiltIns.needsAllNodes) { + const uniqueNodeTypes = new Map( + data.workflow.nodes.map((node) => [ + `${node.type}|${node.typeVersion}`, + { name: node.type, version: node.typeVersion }, + ]), + ); + + const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]); + + const nodeTypes = await this.requestNodeTypes( + task.taskId, + unknownNodeTypes, + ); + + this.nodeTypes.addNodeTypeDescriptions(nodeTypes); + } + const workflowParams = data.workflow; const workflow = new Workflow({ ...workflowParams, diff --git a/packages/@n8n/task-runner/src/message-types.ts b/packages/@n8n/task-runner/src/message-types.ts index 95c5f5b72f..b5f17f965e 100644 --- a/packages/@n8n/task-runner/src/message-types.ts +++ b/packages/@n8n/task-runner/src/message-types.ts @@ -1,6 +1,11 @@ import type { INodeTypeBaseDescription } from 'n8n-workflow'; -import type { RPC_ALLOW_LIST, TaskDataRequestParams, TaskResultData } from './runner-types'; +import type { + NeededNodeType, + RPC_ALLOW_LIST, + TaskDataRequestParams, + TaskResultData, +} from './runner-types'; export namespace BrokerMessage { export namespace ToRunner { @@ -47,6 +52,8 @@ export namespace BrokerMessage { export interface NodeTypes { type: 'broker:nodetypes'; + taskId: string; + requestId: string; nodeTypes: INodeTypeBaseDescription[]; } @@ -87,6 +94,13 @@ export namespace BrokerMessage { requestParams: TaskDataRequestParams; } + export interface NodeTypesRequest { + type: 'broker:nodetypesrequest'; + taskId: string; + requestId: string; + requestParams: NeededNodeType[]; + } + export interface RPC { type: 'broker:rpc'; callId: string; @@ -95,7 +109,7 @@ export namespace BrokerMessage { params: unknown[]; } - export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | RPC; + export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | NodeTypesRequest | RPC; } } @@ -120,6 +134,13 @@ export namespace RequesterMessage { data: unknown; } + export interface NodeTypesResponse { + type: 'requester:nodetypesresponse'; + taskId: string; + requestId: string; + nodeTypes: INodeTypeBaseDescription[]; + } + export interface RPCResponse { type: 'requester:rpcresponse'; taskId: string; @@ -134,7 +155,13 @@ export namespace RequesterMessage { taskType: string; } - export type All = TaskSettings | TaskCancel | RPCResponse | TaskDataResponse | TaskRequest; + export type All = + | TaskSettings + | TaskCancel + | RPCResponse + | TaskDataResponse + | NodeTypesResponse + | TaskRequest; } } @@ -183,6 +210,25 @@ export namespace RunnerMessage { requestParams: TaskDataRequestParams; } + export interface NodeTypesRequest { + type: 'runner:nodetypesrequest'; + taskId: string; + requestId: string; + + /** + * Which node types should be included in the runner's node types request. + * + * Node types are needed only when the script relies on paired item functionality. + * If so, we need only the node types not already cached in the runner. + * + * TODO: In future we can trim this down to only node types in the paired item chain, + * rather than assuming we need all node types in the workflow. + * + * @example [{ name: 'n8n-nodes-base.httpRequest', version: 1 }] + */ + requestParams: NeededNodeType[]; + } + export interface RPC { type: 'runner:rpc'; callId: string; @@ -199,6 +245,7 @@ export namespace RunnerMessage { | TaskRejected | TaskOffer | RPC - | TaskDataRequest; + | TaskDataRequest + | NodeTypesRequest; } } diff --git a/packages/@n8n/task-runner/src/node-types.ts b/packages/@n8n/task-runner/src/node-types.ts index 046321bff9..8f910134b5 100644 --- a/packages/@n8n/task-runner/src/node-types.ts +++ b/packages/@n8n/task-runner/src/node-types.ts @@ -7,6 +7,8 @@ import { type IVersionedNodeType, } from 'n8n-workflow'; +import type { NeededNodeType } from './runner-types'; + type VersionedTypes = Map; export const DEFAULT_NODETYPE_VERSION = 1; @@ -61,4 +63,30 @@ export class TaskRunnerNodeTypes implements INodeTypes { getKnownTypes(): IDataObject { throw new ApplicationError('Unimplemented `getKnownTypes`', { level: 'error' }); } + + addNodeTypeDescriptions(nodeTypeDescriptions: INodeTypeDescription[]) { + const newNodeTypes = this.parseNodeTypes(nodeTypeDescriptions); + + for (const [name, newVersions] of newNodeTypes.entries()) { + if (!this.nodeTypesByVersion.has(name)) { + this.nodeTypesByVersion.set(name, newVersions); + } else { + const existingVersions = this.nodeTypesByVersion.get(name)!; + for (const [version, nodeType] of newVersions.entries()) { + existingVersions.set(version, nodeType); + } + } + } + } + + /** Filter out node type versions that are already registered. */ + onlyUnknown(nodeTypes: NeededNodeType[]) { + return nodeTypes.filter(({ name, version }) => { + const existingVersions = this.nodeTypesByVersion.get(name); + + if (!existingVersions) return true; + + return !existingVersions.has(version); + }); + } } diff --git a/packages/@n8n/task-runner/src/runner-types.ts b/packages/@n8n/task-runner/src/runner-types.ts index c55b50bf4c..836c42ed49 100644 --- a/packages/@n8n/task-runner/src/runner-types.ts +++ b/packages/@n8n/task-runner/src/runner-types.ts @@ -112,3 +112,6 @@ export const RPC_ALLOW_LIST = [ 'helpers.httpRequest', 'logNodeOutput', ] as const; + +/** Node types needed for the runner to execute a task. */ +export type NeededNodeType = { name: string; version: number }; diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index 81de93e6b0..4bd8661daa 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -1,4 +1,4 @@ -import { ApplicationError, type INodeTypeDescription } from 'n8n-workflow'; +import { ApplicationError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { type MessageEvent, WebSocket } from 'ws'; @@ -25,6 +25,12 @@ interface DataRequest { reject: (error: unknown) => void; } +interface NodeTypesRequest { + requestId: string; + resolve: (data: unknown) => void; + reject: (error: unknown) => void; +} + interface RPCCall { callId: string; resolve: (data: unknown) => void; @@ -58,6 +64,8 @@ export abstract class TaskRunner { dataRequests: Map = new Map(); + nodeTypesRequests: Map = new Map(); + rpcCalls: Map = new Map(); nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]); @@ -168,15 +176,11 @@ export abstract class TaskRunner { this.handleRpcResponse(message.callId, message.status, message.data); break; case 'broker:nodetypes': - this.setNodeTypes(message.nodeTypes as unknown as INodeTypeDescription[]); + this.processNodeTypesResponse(message.requestId, message.nodeTypes); break; } } - setNodeTypes(nodeTypes: INodeTypeDescription[]) { - this.nodeTypes = new TaskRunnerNodeTypes(nodeTypes); - } - processDataResponse(requestId: string, data: unknown) { const request = this.dataRequests.get(requestId); if (!request) { @@ -187,6 +191,16 @@ export abstract class TaskRunner { request.resolve(data); } + processNodeTypesResponse(requestId: string, nodeTypes: unknown) { + const request = this.nodeTypesRequests.get(requestId); + + if (!request) return; + + // Deleting of the request is handled in `requestNodeTypes`, using a + // `finally` wrapped around the return + request.resolve(nodeTypes); + } + hasOpenTasks() { return Object.values(this.runningTasks).length < this.maxConcurrency; } @@ -282,6 +296,34 @@ export abstract class TaskRunner { throw new ApplicationError('Unimplemented'); } + async requestNodeTypes( + taskId: Task['taskId'], + requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'], + ) { + const requestId = nanoid(); + + const nodeTypesPromise = new Promise((resolve, reject) => { + this.nodeTypesRequests.set(requestId, { + requestId, + resolve: resolve as (data: unknown) => void, + reject, + }); + }); + + this.send({ + type: 'runner:nodetypesrequest', + taskId, + requestId, + requestParams, + }); + + try { + return await nodeTypesPromise; + } finally { + this.nodeTypesRequests.delete(requestId); + } + } + async requestData( taskId: Task['taskId'], requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'], diff --git a/packages/cli/src/node-types.ts b/packages/cli/src/node-types.ts index 26b1b61e36..553aedd620 100644 --- a/packages/cli/src/node-types.ts +++ b/packages/cli/src/node-types.ts @@ -1,3 +1,4 @@ +import type { NeededNodeType } from '@n8n/task-runner'; import type { Dirent } from 'fs'; import { readdir } from 'fs/promises'; import { loadClassInIsolation } from 'n8n-core'; @@ -149,4 +150,22 @@ export class NodeTypes implements INodeTypes { dirent.name.toLowerCase().startsWith('v') ); } + + getNodeTypeDescriptions(nodeTypes: NeededNodeType[]): INodeTypeDescription[] { + return nodeTypes.map(({ name: nodeTypeName, version: nodeTypeVersion }) => { + const nodeType = this.getNode(nodeTypeName); + + if (!nodeType) throw new ApplicationError(`Unknown node type: ${nodeTypeName}`); + + const { description } = NodeHelpers.getVersionedNodeType(nodeType.type, nodeTypeVersion); + + const descriptionCopy = { ...description }; + + descriptionCopy.name = descriptionCopy.name.startsWith('n8n-nodes') + ? descriptionCopy.name + : `n8n-nodes-base.${descriptionCopy.name}`; // nodes-base nodes are unprefixed + + return descriptionCopy; + }); + } } diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index 715c5d8eb8..614d04c3b5 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -1,5 +1,6 @@ import type { RunnerMessage, TaskResultData } from '@n8n/task-runner'; import { mock } from 'jest-mock-extended'; +import type { INodeTypeBaseDescription } from 'n8n-workflow'; import { TaskRejectError } from '../errors'; import { TaskBroker } from '../task-broker.service'; @@ -11,7 +12,7 @@ describe('TaskBroker', () => { let taskBroker: TaskBroker; beforeEach(() => { - taskBroker = new TaskBroker(mock(), mock()); + taskBroker = new TaskBroker(mock()); jest.restoreAllMocks(); }); @@ -76,13 +77,6 @@ describe('TaskBroker', () => { const messageCallback = jest.fn(); taskBroker.registerRunner(runner, messageCallback); - - expect(messageCallback).toBeCalledWith({ - type: 'broker:nodetypes', - // We're mocking the node types service, so this will - // be undefined. - nodeType: undefined, - }); }); }); @@ -560,5 +554,68 @@ describe('TaskBroker', () => { params: rpcParams, }); }); + + it('should handle `runner:nodetypesrequest` message', async () => { + const runnerId = 'runner1'; + const taskId = 'task1'; + const requesterId = 'requester1'; + const requestId = 'request1'; + const requestParams = [ + { + name: 'n8n-nodes-base.someNode', + version: 1, + }, + ]; + + const message: RunnerMessage.ToBroker.NodeTypesRequest = { + type: 'runner:nodetypesrequest', + taskId, + requestId, + requestParams, + }; + + const requesterMessageCallback = jest.fn(); + + taskBroker.registerRunner(mock({ id: runnerId }), jest.fn()); + taskBroker.setTasks({ + [taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' }, + }); + taskBroker.registerRequester(requesterId, requesterMessageCallback); + + await taskBroker.onRunnerMessage(runnerId, message); + + expect(requesterMessageCallback).toHaveBeenCalledWith({ + type: 'broker:nodetypesrequest', + taskId, + requestId, + requestParams, + }); + }); + }); + + describe('onRequesterMessage', () => { + it('should handle `requester:nodetypesresponse` message', async () => { + const runnerId = 'runner1'; + const taskId = 'task1'; + const requesterId = 'requester1'; + const requestId = 'request1'; + const nodeTypes = [mock(), mock()]; + + const runnerMessageCallback = jest.fn(); + + taskBroker.registerRunner(mock({ id: runnerId }), runnerMessageCallback); + taskBroker.setTasks({ + [taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' }, + }); + + await taskBroker.handleRequesterNodeTypesResponse(taskId, requestId, nodeTypes); + + expect(runnerMessageCallback).toHaveBeenCalledWith({ + type: 'broker:nodetypes', + taskId, + requestId, + nodeTypes, + }); + }); }); }); diff --git a/packages/cli/src/runners/runner-ws-server.ts b/packages/cli/src/runners/runner-ws-server.ts index baaac82bf3..c691462558 100644 --- a/packages/cli/src/runners/runner-ws-server.ts +++ b/packages/cli/src/runners/runner-ws-server.ts @@ -70,7 +70,7 @@ export class TaskRunnerWsServer { this.sendMessage.bind(this, id) as MessageCallback, ); - this.logger.info(`Runner "${message.name}"(${id}) has been registered`); + this.logger.info(`Runner "${message.name}" (${id}) has been registered`); return; } diff --git a/packages/cli/src/runners/task-broker.service.ts b/packages/cli/src/runners/task-broker.service.ts index 7d00cf232e..daa5b48c07 100644 --- a/packages/cli/src/runners/task-broker.service.ts +++ b/packages/cli/src/runners/task-broker.service.ts @@ -8,7 +8,6 @@ import { ApplicationError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { Service } from 'typedi'; -import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials'; import { Logger } from '@/logging/logger.service'; import { TaskRejectError } from './errors'; @@ -79,19 +78,7 @@ export class TaskBroker { private pendingTaskRequests: TaskRequest[] = []; - constructor( - private readonly logger: Logger, - private readonly loadNodesAndCredentials: LoadNodesAndCredentials, - ) { - this.loadNodesAndCredentials.addPostProcessor(this.updateNodeTypes); - } - - updateNodeTypes = async () => { - await this.messageAllRunners({ - type: 'broker:nodetypes', - nodeTypes: this.loadNodesAndCredentials.types.nodes, - }); - }; + constructor(private readonly logger: Logger) {} expireTasks() { const now = process.hrtime.bigint(); @@ -105,10 +92,6 @@ export class TaskBroker { registerRunner(runner: TaskRunner, messageCallback: MessageCallback) { this.knownRunners.set(runner.id, { runner, messageCallback }); void this.knownRunners.get(runner.id)!.messageCallback({ type: 'broker:runnerregistered' }); - void this.knownRunners.get(runner.id)!.messageCallback({ - type: 'broker:nodetypes', - nodeTypes: this.loadNodesAndCredentials.types.nodes, - }); } deregisterRunner(runnerId: string, error: Error) { @@ -145,14 +128,6 @@ export class TaskBroker { await this.knownRunners.get(runnerId)?.messageCallback(message); } - private async messageAllRunners(message: BrokerMessage.ToRunner.All) { - await Promise.allSettled( - [...this.knownRunners.values()].map(async (runner) => { - await runner.messageCallback(message); - }), - ); - } - private async messageRequester(requesterId: string, message: BrokerMessage.ToRequester.All) { await this.requesters.get(requesterId)?.(message); } @@ -187,7 +162,9 @@ export class TaskBroker { case 'runner:taskdatarequest': await this.handleDataRequest(message.taskId, message.requestId, message.requestParams); break; - + case 'runner:nodetypesrequest': + await this.handleNodeTypesRequest(message.taskId, message.requestId, message.requestParams); + break; case 'runner:rpc': await this.handleRpcRequest(message.taskId, message.callId, message.name, message.params); break; @@ -249,6 +226,23 @@ export class TaskBroker { }); } + async handleNodeTypesRequest( + taskId: Task['id'], + requestId: RunnerMessage.ToBroker.NodeTypesRequest['requestId'], + requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'], + ) { + const task = this.tasks.get(taskId); + if (!task) { + return; + } + await this.messageRequester(task.requesterId, { + type: 'broker:nodetypesrequest', + taskId, + requestId, + requestParams, + }); + } + async handleResponse( taskId: Task['id'], requestId: RunnerMessage.ToBroker.TaskDataRequest['requestId'], @@ -284,6 +278,13 @@ export class TaskBroker { case 'requester:taskdataresponse': await this.handleRequesterDataResponse(message.taskId, message.requestId, message.data); break; + case 'requester:nodetypesresponse': + await this.handleRequesterNodeTypesResponse( + message.taskId, + message.requestId, + message.nodeTypes, + ); + break; case 'requester:rpcresponse': await this.handleRequesterRpcResponse( message.taskId, @@ -322,6 +323,21 @@ export class TaskBroker { }); } + async handleRequesterNodeTypesResponse( + taskId: Task['id'], + requestId: RequesterMessage.ToBroker.NodeTypesResponse['requestId'], + nodeTypes: RequesterMessage.ToBroker.NodeTypesResponse['nodeTypes'], + ) { + const runner = await this.getRunnerOrFailTask(taskId); + + await this.messageRunner(runner.id, { + type: 'broker:nodetypes', + taskId, + requestId, + nodeTypes, + }); + } + handleRequesterAccept( taskId: Task['id'], settings: RequesterMessage.ToBroker.TaskSettings['settings'], diff --git a/packages/cli/src/runners/task-managers/local-task-manager.ts b/packages/cli/src/runners/task-managers/local-task-manager.ts index 623b9c912e..7d898aaebe 100644 --- a/packages/cli/src/runners/task-managers/local-task-manager.ts +++ b/packages/cli/src/runners/task-managers/local-task-manager.ts @@ -1,17 +1,20 @@ import type { RequesterMessage } from '@n8n/task-runner'; -import Container from 'typedi'; +import Container, { Service } from 'typedi'; + +import { NodeTypes } from '@/node-types'; import { TaskManager } from './task-manager'; import type { RequesterMessageCallback } from '../task-broker.service'; import { TaskBroker } from '../task-broker.service'; +@Service() export class LocalTaskManager extends TaskManager { taskBroker: TaskBroker; id: string = 'single-main'; - constructor() { - super(); + constructor(nodeTypes: NodeTypes) { + super(nodeTypes); this.registerRequester(); } diff --git a/packages/cli/src/runners/task-managers/task-manager.ts b/packages/cli/src/runners/task-managers/task-manager.ts index 617008a450..21d4e8eb9d 100644 --- a/packages/cli/src/runners/task-managers/task-manager.ts +++ b/packages/cli/src/runners/task-managers/task-manager.ts @@ -17,6 +17,9 @@ import type { } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; +import { Service } from 'typedi'; + +import { NodeTypes } from '@/node-types'; import { DataRequestResponseBuilder } from './data-request-response-builder'; @@ -43,7 +46,8 @@ interface ExecuteFunctionObject { [name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject; } -export class TaskManager { +@Service() +export abstract class TaskManager { requestAcceptRejects: Map = new Map(); taskAcceptRejects: Map = new Map(); @@ -52,6 +56,8 @@ export class TaskManager { tasks: Map = new Map(); + constructor(private readonly nodeTypes: NodeTypes) {} + async startTask( additionalData: IWorkflowExecuteAdditionalData, taskType: string, @@ -173,6 +179,9 @@ export class TaskManager { case 'broker:taskdatarequest': this.sendTaskData(message.taskId, message.requestId, message.requestParams); break; + case 'broker:nodetypesrequest': + this.sendNodeTypes(message.taskId, message.requestId, message.requestParams); + break; case 'broker:rpc': void this.handleRpc(message.taskId, message.callId, message.name, message.params); break; @@ -239,6 +248,21 @@ export class TaskManager { }); } + sendNodeTypes( + taskId: string, + requestId: string, + neededNodeTypes: BrokerMessage.ToRequester.NodeTypesRequest['requestParams'], + ) { + const nodeTypes = this.nodeTypes.getNodeTypeDescriptions(neededNodeTypes); + + this.sendMessage({ + type: 'requester:nodetypesresponse', + taskId, + requestId, + nodeTypes, + }); + } + async handleRpc( taskId: string, callId: string, diff --git a/packages/cli/src/runners/task-runner-module.ts b/packages/cli/src/runners/task-runner-module.ts index 13521c599b..f1383f1fba 100644 --- a/packages/cli/src/runners/task-runner-module.ts +++ b/packages/cli/src/runners/task-runner-module.ts @@ -54,7 +54,7 @@ export class TaskRunnerModule { private async loadTaskManager() { const { TaskManager } = await import('@/runners/task-managers/task-manager'); const { LocalTaskManager } = await import('@/runners/task-managers/local-task-manager'); - this.taskManager = new LocalTaskManager(); + this.taskManager = Container.get(LocalTaskManager); Container.set(TaskManager, this.taskManager); }