perf(core): Load node types on demand on runners (no-changelog) (#11559)

This commit is contained in:
Iván Ovejero 2024-11-06 13:39:31 +01:00 committed by GitHub
parent befa26f89a
commit ca75020821
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 345 additions and 51 deletions

View file

@ -63,4 +63,35 @@ describe('TaskRunnerNodeTypes', () => {
});
});
});
describe('addNodeTypeDescriptions', () => {
it('should add new node types', () => {
const nodeTypes = new TaskRunnerNodeTypes(TYPES);
const nodeTypeDescriptions = [
{ name: 'new-type', version: 1 },
{ name: 'new-type', version: 2 },
] as INodeTypeDescription[];
nodeTypes.addNodeTypeDescriptions(nodeTypeDescriptions);
expect(nodeTypes.getByNameAndVersion('new-type', 1)).toEqual({
description: { name: 'new-type', version: 1 },
});
expect(nodeTypes.getByNameAndVersion('new-type', 2)).toEqual({
description: { name: 'new-type', version: 2 },
});
});
});
describe('onlyUnknown', () => {
it('should return only unknown node types', () => {
const nodeTypes = new TaskRunnerNodeTypes(TYPES);
const candidate = { name: 'unknown', version: 1 };
expect(nodeTypes.onlyUnknown([candidate])).toEqual([candidate]);
expect(nodeTypes.onlyUnknown([SINGLE_VERSIONED])).toEqual([]);
});
});
});

View file

@ -17,6 +17,7 @@ import type {
IRunExecutionData,
WorkflowExecuteMode,
EnvProviderState,
INodeTypeDescription,
} from 'n8n-workflow';
import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm';
@ -119,6 +120,29 @@ export class JsTaskRunner extends TaskRunner {
neededBuiltIns.toDataRequestParams(),
);
/**
* We request node types only when we know a task needs all nodes, because
* needing all nodes means that the task relies on paired item functionality,
* which is the same requirement for needing node types.
*/
if (neededBuiltIns.needsAllNodes) {
const uniqueNodeTypes = new Map(
data.workflow.nodes.map((node) => [
`${node.type}|${node.typeVersion}`,
{ name: node.type, version: node.typeVersion },
]),
);
const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]);
const nodeTypes = await this.requestNodeTypes<INodeTypeDescription[]>(
task.taskId,
unknownNodeTypes,
);
this.nodeTypes.addNodeTypeDescriptions(nodeTypes);
}
const workflowParams = data.workflow;
const workflow = new Workflow({
...workflowParams,

View file

@ -1,6 +1,11 @@
import type { INodeTypeBaseDescription } from 'n8n-workflow';
import type { RPC_ALLOW_LIST, TaskDataRequestParams, TaskResultData } from './runner-types';
import type {
NeededNodeType,
RPC_ALLOW_LIST,
TaskDataRequestParams,
TaskResultData,
} from './runner-types';
export namespace BrokerMessage {
export namespace ToRunner {
@ -47,6 +52,8 @@ export namespace BrokerMessage {
export interface NodeTypes {
type: 'broker:nodetypes';
taskId: string;
requestId: string;
nodeTypes: INodeTypeBaseDescription[];
}
@ -87,6 +94,13 @@ export namespace BrokerMessage {
requestParams: TaskDataRequestParams;
}
export interface NodeTypesRequest {
type: 'broker:nodetypesrequest';
taskId: string;
requestId: string;
requestParams: NeededNodeType[];
}
export interface RPC {
type: 'broker:rpc';
callId: string;
@ -95,7 +109,7 @@ export namespace BrokerMessage {
params: unknown[];
}
export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | RPC;
export type All = TaskReady | TaskDone | TaskError | TaskDataRequest | NodeTypesRequest | RPC;
}
}
@ -120,6 +134,13 @@ export namespace RequesterMessage {
data: unknown;
}
export interface NodeTypesResponse {
type: 'requester:nodetypesresponse';
taskId: string;
requestId: string;
nodeTypes: INodeTypeBaseDescription[];
}
export interface RPCResponse {
type: 'requester:rpcresponse';
taskId: string;
@ -134,7 +155,13 @@ export namespace RequesterMessage {
taskType: string;
}
export type All = TaskSettings | TaskCancel | RPCResponse | TaskDataResponse | TaskRequest;
export type All =
| TaskSettings
| TaskCancel
| RPCResponse
| TaskDataResponse
| NodeTypesResponse
| TaskRequest;
}
}
@ -183,6 +210,25 @@ export namespace RunnerMessage {
requestParams: TaskDataRequestParams;
}
export interface NodeTypesRequest {
type: 'runner:nodetypesrequest';
taskId: string;
requestId: string;
/**
* Which node types should be included in the runner's node types request.
*
* Node types are needed only when the script relies on paired item functionality.
* If so, we need only the node types not already cached in the runner.
*
* TODO: In future we can trim this down to only node types in the paired item chain,
* rather than assuming we need all node types in the workflow.
*
* @example [{ name: 'n8n-nodes-base.httpRequest', version: 1 }]
*/
requestParams: NeededNodeType[];
}
export interface RPC {
type: 'runner:rpc';
callId: string;
@ -199,6 +245,7 @@ export namespace RunnerMessage {
| TaskRejected
| TaskOffer
| RPC
| TaskDataRequest;
| TaskDataRequest
| NodeTypesRequest;
}
}

View file

@ -7,6 +7,8 @@ import {
type IVersionedNodeType,
} from 'n8n-workflow';
import type { NeededNodeType } from './runner-types';
type VersionedTypes = Map<number, INodeTypeDescription>;
export const DEFAULT_NODETYPE_VERSION = 1;
@ -61,4 +63,30 @@ export class TaskRunnerNodeTypes implements INodeTypes {
getKnownTypes(): IDataObject {
throw new ApplicationError('Unimplemented `getKnownTypes`', { level: 'error' });
}
addNodeTypeDescriptions(nodeTypeDescriptions: INodeTypeDescription[]) {
const newNodeTypes = this.parseNodeTypes(nodeTypeDescriptions);
for (const [name, newVersions] of newNodeTypes.entries()) {
if (!this.nodeTypesByVersion.has(name)) {
this.nodeTypesByVersion.set(name, newVersions);
} else {
const existingVersions = this.nodeTypesByVersion.get(name)!;
for (const [version, nodeType] of newVersions.entries()) {
existingVersions.set(version, nodeType);
}
}
}
}
/** Filter out node type versions that are already registered. */
onlyUnknown(nodeTypes: NeededNodeType[]) {
return nodeTypes.filter(({ name, version }) => {
const existingVersions = this.nodeTypesByVersion.get(name);
if (!existingVersions) return true;
return !existingVersions.has(version);
});
}
}

View file

@ -112,3 +112,6 @@ export const RPC_ALLOW_LIST = [
'helpers.httpRequest',
'logNodeOutput',
] as const;
/** Node types needed for the runner to execute a task. */
export type NeededNodeType = { name: string; version: number };

View file

@ -1,4 +1,4 @@
import { ApplicationError, type INodeTypeDescription } from 'n8n-workflow';
import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { type MessageEvent, WebSocket } from 'ws';
@ -25,6 +25,12 @@ interface DataRequest {
reject: (error: unknown) => void;
}
interface NodeTypesRequest {
requestId: string;
resolve: (data: unknown) => void;
reject: (error: unknown) => void;
}
interface RPCCall {
callId: string;
resolve: (data: unknown) => void;
@ -58,6 +64,8 @@ export abstract class TaskRunner {
dataRequests: Map<DataRequest['requestId'], DataRequest> = new Map();
nodeTypesRequests: Map<NodeTypesRequest['requestId'], NodeTypesRequest> = new Map();
rpcCalls: Map<RPCCall['callId'], RPCCall> = new Map();
nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]);
@ -168,15 +176,11 @@ export abstract class TaskRunner {
this.handleRpcResponse(message.callId, message.status, message.data);
break;
case 'broker:nodetypes':
this.setNodeTypes(message.nodeTypes as unknown as INodeTypeDescription[]);
this.processNodeTypesResponse(message.requestId, message.nodeTypes);
break;
}
}
setNodeTypes(nodeTypes: INodeTypeDescription[]) {
this.nodeTypes = new TaskRunnerNodeTypes(nodeTypes);
}
processDataResponse(requestId: string, data: unknown) {
const request = this.dataRequests.get(requestId);
if (!request) {
@ -187,6 +191,16 @@ export abstract class TaskRunner {
request.resolve(data);
}
processNodeTypesResponse(requestId: string, nodeTypes: unknown) {
const request = this.nodeTypesRequests.get(requestId);
if (!request) return;
// Deleting of the request is handled in `requestNodeTypes`, using a
// `finally` wrapped around the return
request.resolve(nodeTypes);
}
hasOpenTasks() {
return Object.values(this.runningTasks).length < this.maxConcurrency;
}
@ -282,6 +296,34 @@ export abstract class TaskRunner {
throw new ApplicationError('Unimplemented');
}
async requestNodeTypes<T = unknown>(
taskId: Task['taskId'],
requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'],
) {
const requestId = nanoid();
const nodeTypesPromise = new Promise<T>((resolve, reject) => {
this.nodeTypesRequests.set(requestId, {
requestId,
resolve: resolve as (data: unknown) => void,
reject,
});
});
this.send({
type: 'runner:nodetypesrequest',
taskId,
requestId,
requestParams,
});
try {
return await nodeTypesPromise;
} finally {
this.nodeTypesRequests.delete(requestId);
}
}
async requestData<T = unknown>(
taskId: Task['taskId'],
requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'],

View file

@ -1,3 +1,4 @@
import type { NeededNodeType } from '@n8n/task-runner';
import type { Dirent } from 'fs';
import { readdir } from 'fs/promises';
import { loadClassInIsolation } from 'n8n-core';
@ -149,4 +150,22 @@ export class NodeTypes implements INodeTypes {
dirent.name.toLowerCase().startsWith('v')
);
}
getNodeTypeDescriptions(nodeTypes: NeededNodeType[]): INodeTypeDescription[] {
return nodeTypes.map(({ name: nodeTypeName, version: nodeTypeVersion }) => {
const nodeType = this.getNode(nodeTypeName);
if (!nodeType) throw new ApplicationError(`Unknown node type: ${nodeTypeName}`);
const { description } = NodeHelpers.getVersionedNodeType(nodeType.type, nodeTypeVersion);
const descriptionCopy = { ...description };
descriptionCopy.name = descriptionCopy.name.startsWith('n8n-nodes')
? descriptionCopy.name
: `n8n-nodes-base.${descriptionCopy.name}`; // nodes-base nodes are unprefixed
return descriptionCopy;
});
}
}

View file

@ -1,5 +1,6 @@
import type { RunnerMessage, TaskResultData } from '@n8n/task-runner';
import { mock } from 'jest-mock-extended';
import type { INodeTypeBaseDescription } from 'n8n-workflow';
import { TaskRejectError } from '../errors';
import { TaskBroker } from '../task-broker.service';
@ -11,7 +12,7 @@ describe('TaskBroker', () => {
let taskBroker: TaskBroker;
beforeEach(() => {
taskBroker = new TaskBroker(mock(), mock());
taskBroker = new TaskBroker(mock());
jest.restoreAllMocks();
});
@ -76,13 +77,6 @@ describe('TaskBroker', () => {
const messageCallback = jest.fn();
taskBroker.registerRunner(runner, messageCallback);
expect(messageCallback).toBeCalledWith({
type: 'broker:nodetypes',
// We're mocking the node types service, so this will
// be undefined.
nodeType: undefined,
});
});
});
@ -560,5 +554,68 @@ describe('TaskBroker', () => {
params: rpcParams,
});
});
it('should handle `runner:nodetypesrequest` message', async () => {
const runnerId = 'runner1';
const taskId = 'task1';
const requesterId = 'requester1';
const requestId = 'request1';
const requestParams = [
{
name: 'n8n-nodes-base.someNode',
version: 1,
},
];
const message: RunnerMessage.ToBroker.NodeTypesRequest = {
type: 'runner:nodetypesrequest',
taskId,
requestId,
requestParams,
};
const requesterMessageCallback = jest.fn();
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), jest.fn());
taskBroker.setTasks({
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
});
taskBroker.registerRequester(requesterId, requesterMessageCallback);
await taskBroker.onRunnerMessage(runnerId, message);
expect(requesterMessageCallback).toHaveBeenCalledWith({
type: 'broker:nodetypesrequest',
taskId,
requestId,
requestParams,
});
});
});
describe('onRequesterMessage', () => {
it('should handle `requester:nodetypesresponse` message', async () => {
const runnerId = 'runner1';
const taskId = 'task1';
const requesterId = 'requester1';
const requestId = 'request1';
const nodeTypes = [mock<INodeTypeBaseDescription>(), mock<INodeTypeBaseDescription>()];
const runnerMessageCallback = jest.fn();
taskBroker.registerRunner(mock<TaskRunner>({ id: runnerId }), runnerMessageCallback);
taskBroker.setTasks({
[taskId]: { id: taskId, runnerId, requesterId, taskType: 'test' },
});
await taskBroker.handleRequesterNodeTypesResponse(taskId, requestId, nodeTypes);
expect(runnerMessageCallback).toHaveBeenCalledWith({
type: 'broker:nodetypes',
taskId,
requestId,
nodeTypes,
});
});
});
});

View file

@ -70,7 +70,7 @@ export class TaskRunnerWsServer {
this.sendMessage.bind(this, id) as MessageCallback,
);
this.logger.info(`Runner "${message.name}"(${id}) has been registered`);
this.logger.info(`Runner "${message.name}" (${id}) has been registered`);
return;
}

View file

@ -8,7 +8,6 @@ import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { Service } from 'typedi';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { Logger } from '@/logging/logger.service';
import { TaskRejectError } from './errors';
@ -79,19 +78,7 @@ export class TaskBroker {
private pendingTaskRequests: TaskRequest[] = [];
constructor(
private readonly logger: Logger,
private readonly loadNodesAndCredentials: LoadNodesAndCredentials,
) {
this.loadNodesAndCredentials.addPostProcessor(this.updateNodeTypes);
}
updateNodeTypes = async () => {
await this.messageAllRunners({
type: 'broker:nodetypes',
nodeTypes: this.loadNodesAndCredentials.types.nodes,
});
};
constructor(private readonly logger: Logger) {}
expireTasks() {
const now = process.hrtime.bigint();
@ -105,10 +92,6 @@ export class TaskBroker {
registerRunner(runner: TaskRunner, messageCallback: MessageCallback) {
this.knownRunners.set(runner.id, { runner, messageCallback });
void this.knownRunners.get(runner.id)!.messageCallback({ type: 'broker:runnerregistered' });
void this.knownRunners.get(runner.id)!.messageCallback({
type: 'broker:nodetypes',
nodeTypes: this.loadNodesAndCredentials.types.nodes,
});
}
deregisterRunner(runnerId: string, error: Error) {
@ -145,14 +128,6 @@ export class TaskBroker {
await this.knownRunners.get(runnerId)?.messageCallback(message);
}
private async messageAllRunners(message: BrokerMessage.ToRunner.All) {
await Promise.allSettled(
[...this.knownRunners.values()].map(async (runner) => {
await runner.messageCallback(message);
}),
);
}
private async messageRequester(requesterId: string, message: BrokerMessage.ToRequester.All) {
await this.requesters.get(requesterId)?.(message);
}
@ -187,7 +162,9 @@ export class TaskBroker {
case 'runner:taskdatarequest':
await this.handleDataRequest(message.taskId, message.requestId, message.requestParams);
break;
case 'runner:nodetypesrequest':
await this.handleNodeTypesRequest(message.taskId, message.requestId, message.requestParams);
break;
case 'runner:rpc':
await this.handleRpcRequest(message.taskId, message.callId, message.name, message.params);
break;
@ -249,6 +226,23 @@ export class TaskBroker {
});
}
async handleNodeTypesRequest(
taskId: Task['id'],
requestId: RunnerMessage.ToBroker.NodeTypesRequest['requestId'],
requestParams: RunnerMessage.ToBroker.NodeTypesRequest['requestParams'],
) {
const task = this.tasks.get(taskId);
if (!task) {
return;
}
await this.messageRequester(task.requesterId, {
type: 'broker:nodetypesrequest',
taskId,
requestId,
requestParams,
});
}
async handleResponse(
taskId: Task['id'],
requestId: RunnerMessage.ToBroker.TaskDataRequest['requestId'],
@ -284,6 +278,13 @@ export class TaskBroker {
case 'requester:taskdataresponse':
await this.handleRequesterDataResponse(message.taskId, message.requestId, message.data);
break;
case 'requester:nodetypesresponse':
await this.handleRequesterNodeTypesResponse(
message.taskId,
message.requestId,
message.nodeTypes,
);
break;
case 'requester:rpcresponse':
await this.handleRequesterRpcResponse(
message.taskId,
@ -322,6 +323,21 @@ export class TaskBroker {
});
}
async handleRequesterNodeTypesResponse(
taskId: Task['id'],
requestId: RequesterMessage.ToBroker.NodeTypesResponse['requestId'],
nodeTypes: RequesterMessage.ToBroker.NodeTypesResponse['nodeTypes'],
) {
const runner = await this.getRunnerOrFailTask(taskId);
await this.messageRunner(runner.id, {
type: 'broker:nodetypes',
taskId,
requestId,
nodeTypes,
});
}
handleRequesterAccept(
taskId: Task['id'],
settings: RequesterMessage.ToBroker.TaskSettings['settings'],

View file

@ -1,17 +1,20 @@
import type { RequesterMessage } from '@n8n/task-runner';
import Container from 'typedi';
import Container, { Service } from 'typedi';
import { NodeTypes } from '@/node-types';
import { TaskManager } from './task-manager';
import type { RequesterMessageCallback } from '../task-broker.service';
import { TaskBroker } from '../task-broker.service';
@Service()
export class LocalTaskManager extends TaskManager {
taskBroker: TaskBroker;
id: string = 'single-main';
constructor() {
super();
constructor(nodeTypes: NodeTypes) {
super(nodeTypes);
this.registerRequester();
}

View file

@ -17,6 +17,9 @@ import type {
} from 'n8n-workflow';
import { createResultOk, createResultError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { Service } from 'typedi';
import { NodeTypes } from '@/node-types';
import { DataRequestResponseBuilder } from './data-request-response-builder';
@ -43,7 +46,8 @@ interface ExecuteFunctionObject {
[name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject;
}
export class TaskManager {
@Service()
export abstract class TaskManager {
requestAcceptRejects: Map<string, { accept: RequestAccept; reject: RequestReject }> = new Map();
taskAcceptRejects: Map<string, { accept: TaskAccept; reject: TaskReject }> = new Map();
@ -52,6 +56,8 @@ export class TaskManager {
tasks: Map<string, Task> = new Map();
constructor(private readonly nodeTypes: NodeTypes) {}
async startTask<TData, TError>(
additionalData: IWorkflowExecuteAdditionalData,
taskType: string,
@ -173,6 +179,9 @@ export class TaskManager {
case 'broker:taskdatarequest':
this.sendTaskData(message.taskId, message.requestId, message.requestParams);
break;
case 'broker:nodetypesrequest':
this.sendNodeTypes(message.taskId, message.requestId, message.requestParams);
break;
case 'broker:rpc':
void this.handleRpc(message.taskId, message.callId, message.name, message.params);
break;
@ -239,6 +248,21 @@ export class TaskManager {
});
}
sendNodeTypes(
taskId: string,
requestId: string,
neededNodeTypes: BrokerMessage.ToRequester.NodeTypesRequest['requestParams'],
) {
const nodeTypes = this.nodeTypes.getNodeTypeDescriptions(neededNodeTypes);
this.sendMessage({
type: 'requester:nodetypesresponse',
taskId,
requestId,
nodeTypes,
});
}
async handleRpc(
taskId: string,
callId: string,

View file

@ -54,7 +54,7 @@ export class TaskRunnerModule {
private async loadTaskManager() {
const { TaskManager } = await import('@/runners/task-managers/task-manager');
const { LocalTaskManager } = await import('@/runners/task-managers/local-task-manager');
this.taskManager = new LocalTaskManager();
this.taskManager = Container.get(LocalTaskManager);
Container.set(TaskManager, this.taskManager);
}