feat: Add proper INodeTypes polyfill for Task Runner (no-changelog) (#11333)

This commit is contained in:
Val 2024-10-22 15:19:32 +01:00 committed by GitHub
parent 00a48b781e
commit cade9b2d91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 202 additions and 21 deletions

View file

@ -0,0 +1,66 @@
import type { INodeTypeDescription } from 'n8n-workflow';
import { TaskRunnerNodeTypes } from '../node-types';
const SINGLE_VERSIONED = { name: 'single-versioned', version: 1 };
const SINGLE_UNVERSIONED = { name: 'single-unversioned' };
const MULTI_VERSIONED = { name: 'multi-versioned', version: [1, 2] };
const SPLIT_VERSIONED = [
{ name: 'split-versioned', version: 1 },
{ name: 'split-versioned', version: 2 },
];
const TYPES: INodeTypeDescription[] = [
SINGLE_VERSIONED,
SINGLE_UNVERSIONED,
MULTI_VERSIONED,
...SPLIT_VERSIONED,
] as INodeTypeDescription[];
describe('TaskRunnerNodeTypes', () => {
describe('getByNameAndVersion', () => {
let nodeTypes: TaskRunnerNodeTypes;
beforeEach(() => {
nodeTypes = new TaskRunnerNodeTypes(TYPES);
});
it('should return undefined if not found', () => {
expect(nodeTypes.getByNameAndVersion('unknown', 1)).toBeUndefined();
});
it('should return highest versioned node type if no version is given', () => {
expect(nodeTypes.getByNameAndVersion('split-versioned')).toEqual({
description: SPLIT_VERSIONED[1],
});
});
it('should return specified version for split version', () => {
expect(nodeTypes.getByNameAndVersion('split-versioned', 1)).toEqual({
description: SPLIT_VERSIONED[0],
});
});
it('should return undefined on unknown version', () => {
expect(nodeTypes.getByNameAndVersion('split-versioned', 3)).toBeUndefined();
});
it('should return specified version for multi version', () => {
expect(nodeTypes.getByNameAndVersion('multi-versioned', 1)).toEqual({
description: MULTI_VERSIONED,
});
expect(nodeTypes.getByNameAndVersion('multi-versioned', 2)).toEqual({
description: MULTI_VERSIONED,
});
});
it('should default to DEFAULT_NODETYPE_VERSION if no version specified', () => {
expect(nodeTypes.getByNameAndVersion('single-unversioned', 1)).toEqual({
description: SINGLE_UNVERSIONED,
});
});
});
});

View file

@ -7,7 +7,6 @@ import {
import type {
CodeExecutionMode,
INode,
INodeType,
ITaskDataConnections,
IWorkflowExecuteAdditionalData,
WorkflowParameters,
@ -129,17 +128,7 @@ export class JsTaskRunner extends TaskRunner {
const workflowParams = allData.workflow;
const workflow = new Workflow({
...workflowParams,
nodeTypes: {
getByNameAndVersion() {
return undefined as unknown as INodeType;
},
getByName() {
return undefined as unknown as INodeType;
},
getKnownTypes() {
return {};
},
},
nodeTypes: this.nodeTypes,
});
const customConsole = {

View file

@ -0,0 +1,64 @@
import {
ApplicationError,
type IDataObject,
type INodeType,
type INodeTypeDescription,
type INodeTypes,
type IVersionedNodeType,
} from 'n8n-workflow';
type VersionedTypes = Map<number, INodeTypeDescription>;
export const DEFAULT_NODETYPE_VERSION = 1;
export class TaskRunnerNodeTypes implements INodeTypes {
private nodeTypesByVersion: Map<string, VersionedTypes>;
constructor(nodeTypes: INodeTypeDescription[]) {
this.nodeTypesByVersion = this.parseNodeTypes(nodeTypes);
}
private parseNodeTypes(nodeTypes: INodeTypeDescription[]): Map<string, VersionedTypes> {
const versionedTypes = new Map<string, VersionedTypes>();
for (const nt of nodeTypes) {
const versions = Array.isArray(nt.version)
? nt.version
: [nt.version ?? DEFAULT_NODETYPE_VERSION];
const versioned: VersionedTypes =
versionedTypes.get(nt.name) ?? new Map<number, INodeTypeDescription>();
for (const version of versions) {
versioned.set(version, { ...versioned.get(version), ...nt });
}
versionedTypes.set(nt.name, versioned);
}
return versionedTypes;
}
// This isn't used in Workflow from what I can see
getByName(_nodeType: string): INodeType | IVersionedNodeType {
throw new ApplicationError('Unimplemented `getByName`', { level: 'error' });
}
getByNameAndVersion(nodeType: string, version?: number): INodeType {
const versions = this.nodeTypesByVersion.get(nodeType);
if (!versions) {
return undefined as unknown as INodeType;
}
const nodeVersion = versions.get(version ?? Math.max(...versions.keys()));
if (!nodeVersion) {
return undefined as unknown as INodeType;
}
return {
description: nodeVersion,
};
}
// This isn't used in Workflow from what I can see
getKnownTypes(): IDataObject {
throw new ApplicationError('Unimplemented `getKnownTypes`', { level: 'error' });
}
}

View file

@ -1,4 +1,4 @@
import type { INodeExecutionData } from 'n8n-workflow';
import type { INodeExecutionData, INodeTypeBaseDescription } from 'n8n-workflow';
export type DataRequestType = 'input' | 'node' | 'all';
@ -50,6 +50,11 @@ export namespace N8nMessage {
data: unknown;
}
export interface NodeTypes {
type: 'broker:nodetypes';
nodeTypes: INodeTypeBaseDescription[];
}
export type All =
| InfoRequest
| TaskOfferAccept
@ -57,7 +62,8 @@ export namespace N8nMessage {
| TaskSettings
| RunnerRegistered
| RPCResponse
| TaskDataResponse;
| TaskDataResponse
| NodeTypes;
}
export namespace ToRequester {

View file

@ -1,8 +1,9 @@
import { ApplicationError } from 'n8n-workflow';
import { ApplicationError, type INodeTypeDescription } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { URL } from 'node:url';
import { type MessageEvent, WebSocket } from 'ws';
import { TaskRunnerNodeTypes } from './node-types';
import {
RPC_ALLOW_LIST,
type RunnerMessage,
@ -58,6 +59,8 @@ export abstract class TaskRunner {
rpcCalls: Map<RPCCall['callId'], RPCCall> = new Map();
nodeTypes: TaskRunnerNodeTypes = new TaskRunnerNodeTypes([]);
constructor(
public taskType: string,
wsUrl: string,
@ -158,9 +161,17 @@ export abstract class TaskRunner {
break;
case 'broker:rpcresponse':
this.handleRpcResponse(message.callId, message.status, message.data);
break;
case 'broker:nodetypes':
this.setNodeTypes(message.nodeTypes as unknown as INodeTypeDescription[]);
break;
}
}
setNodeTypes(nodeTypes: INodeTypeDescription[]) {
this.nodeTypes = new TaskRunnerNodeTypes(nodeTypes);
}
processDataResponse(requestId: string, data: unknown) {
const request = this.dataRequests.get(requestId);
if (!request) {

View file

@ -11,7 +11,7 @@ describe('TaskBroker', () => {
let taskBroker: TaskBroker;
beforeEach(() => {
taskBroker = new TaskBroker(mock());
taskBroker = new TaskBroker(mock(), mock());
jest.restoreAllMocks();
});
@ -69,6 +69,21 @@ describe('TaskBroker', () => {
expect(knownRunners.get(runnerId)?.runner).toEqual(runner);
expect(knownRunners.get(runnerId)?.messageCallback).toEqual(messageCallback);
});
it('should send node types to runner', () => {
const runnerId = 'runner1';
const runner = mock<TaskRunner>({ id: runnerId });
const messageCallback = jest.fn();
taskBroker.registerRunner(runner, messageCallback);
expect(messageCallback).toBeCalledWith({
type: 'broker:nodetypes',
// We're mocking the node types service, so this will
// be undefined.
nodeType: undefined,
});
});
});
describe('registerRequester', () => {

View file

@ -1,5 +1,5 @@
import type { Response } from 'express';
import type { INodeExecutionData } from 'n8n-workflow';
import type { INodeExecutionData, INodeTypeBaseDescription } from 'n8n-workflow';
import type WebSocket from 'ws';
import type { TaskRunner } from './task-broker.service';
@ -62,6 +62,11 @@ export namespace N8nMessage {
data: unknown;
}
export interface NodeTypes {
type: 'broker:nodetypes';
nodeTypes: INodeTypeBaseDescription[];
}
export type All =
| InfoRequest
| TaskOfferAccept
@ -69,7 +74,8 @@ export namespace N8nMessage {
| TaskSettings
| RunnerRegistered
| RPCResponse
| TaskDataResponse;
| TaskDataResponse
| NodeTypes;
}
export namespace ToRequester {

View file

@ -87,8 +87,6 @@ export class TaskRunnerService {
this.sendMessage.bind(this, id) as MessageCallback,
);
this.sendMessage(id, { type: 'broker:runnerregistered' });
this.logger.info(`Runner "${message.name}"(${id}) has been registered`);
return;
}

View file

@ -2,6 +2,7 @@ import { ApplicationError } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { Service } from 'typedi';
import { LoadNodesAndCredentials } from '@/load-nodes-and-credentials';
import { Logger } from '@/logging/logger.service';
import { TaskRejectError } from './errors';
@ -71,7 +72,19 @@ export class TaskBroker {
private pendingTaskRequests: TaskRequest[] = [];
constructor(private readonly logger: Logger) {}
constructor(
private readonly logger: Logger,
private readonly loadNodesAndCredentials: LoadNodesAndCredentials,
) {
this.loadNodesAndCredentials.addPostProcessor(this.updateNodeTypes);
}
updateNodeTypes = async () => {
await this.messageAllRunners({
type: 'broker:nodetypes',
nodeTypes: this.loadNodesAndCredentials.types.nodes,
});
};
expireTasks() {
const now = process.hrtime.bigint();
@ -84,6 +97,11 @@ export class TaskBroker {
registerRunner(runner: TaskRunner, messageCallback: MessageCallback) {
this.knownRunners.set(runner.id, { runner, messageCallback });
void this.knownRunners.get(runner.id)!.messageCallback({ type: 'broker:runnerregistered' });
void this.knownRunners.get(runner.id)!.messageCallback({
type: 'broker:nodetypes',
nodeTypes: this.loadNodesAndCredentials.types.nodes,
});
}
deregisterRunner(runnerId: string) {
@ -117,6 +135,14 @@ export class TaskBroker {
await this.knownRunners.get(runnerId)?.messageCallback(message);
}
private async messageAllRunners(message: N8nMessage.ToRunner.All) {
await Promise.allSettled(
[...this.knownRunners.values()].map(async (runner) => {
await runner.messageCallback(message);
}),
);
}
private async messageRequester(requesterId: string, message: N8nMessage.ToRequester.All) {
await this.requesters.get(requesterId)?.(message);
}