Remove core and workflow imports from runner

This commit is contained in:
Iván Ovejero 2024-11-11 14:15:32 +01:00
parent f1e2df7d07
commit 1952721d72
No known key found for this signature in database
11 changed files with 256 additions and 176 deletions

View file

@ -6,7 +6,7 @@
"start": "node dist/start.js", "start": "node dist/start.js",
"dev": "pnpm build && pnpm start", "dev": "pnpm build && pnpm start",
"typecheck": "tsc --noEmit", "typecheck": "tsc --noEmit",
"build": "tsc -p ./tsconfig.build.json && tsc-alias -p tsconfig.build.json", "build": "tsc -p ./tsconfig.build.json --noEmitOnError && tsc-alias -p tsconfig.build.json",
"format": "biome format --write src", "format": "biome format --write src",
"format:check": "biome ci src", "format:check": "biome ci src",
"test": "jest", "test": "jest",

View file

@ -1,7 +1,23 @@
import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-workflow'; import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-workflow';
import { NodeConnectionType } from 'n8n-workflow'; // import { NodeConnectionType } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
// eslint-disable-next-line no-restricted-syntax
enum NodeConnectionType {
AiAgent = 'ai_agent',
AiChain = 'ai_chain',
AiDocument = 'ai_document',
AiEmbedding = 'ai_embedding',
AiLanguageModel = 'ai_languageModel',
AiMemory = 'ai_memory',
AiOutputParser = 'ai_outputParser',
AiRetriever = 'ai_retriever',
AiTextSplitter = 'ai_textSplitter',
AiTool = 'ai_tool',
AiVectorStore = 'ai_vectorStore',
Main = 'main',
}
import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types'; import type { DataRequestResponse } from '@/runner-types';
import type { Task } from '@/task-runner'; import type { Task } from '@/task-runner';
@ -76,6 +92,7 @@ export const newDataRequestResponse = (
id: '1', id: '1',
name: 'Test Workflow', name: 'Test Workflow',
active: true, active: true,
// @ts-expect-error test
connections: { connections: {
[manualTriggerNode.name]: { [manualTriggerNode.name]: {
main: [[{ node: codeNode.name, type: NodeConnectionType.Main, index: 0 }]], main: [[{ node: codeNode.name, type: NodeConnectionType.Main, index: 0 }]],

View file

@ -1,8 +1,37 @@
import type { CallExpression, Identifier, Node, Program } from 'acorn'; import type { CallExpression, Identifier, Node, Program } from 'acorn';
import { parse } from 'acorn'; import { parse } from 'acorn';
import { ancestor } from 'acorn-walk'; import { ancestor } from 'acorn-walk';
import type { Result } from 'n8n-workflow'; // import type { Result } from 'n8n-workflow';
import { toResult } from 'n8n-workflow';
const ensureError = (e: unknown) => {
if (e instanceof Error) {
return e;
}
return new Error(String(e));
};
export type ResultOk<T> = { ok: true; result: T };
export type ResultError<E> = { ok: false; error: E };
export type Result<T, E> = ResultOk<T> | ResultError<E>;
export const createResultOk = <T>(data: T): ResultOk<T> => ({
ok: true,
result: data,
});
export const createResultError = <E = unknown>(error: E): ResultError<E> => ({
ok: false,
error,
});
export const toResult = <T, E extends Error = Error>(fn: () => T): Result<T, E> => {
try {
return createResultOk<T>(fn());
} catch (e) {
const error = ensureError(e);
return createResultError<E>(error as E);
}
};
import { import {
isAssignmentExpression, isAssignmentExpression,

View file

@ -1,8 +1,6 @@
import { getAdditionalKeys } from 'n8n-core'; // import { getAdditionalKeys } from 'n8n-core';
import { WorkflowDataProxy, Workflow } from 'n8n-workflow';
import type { import type {
CodeExecutionMode, CodeExecutionMode,
IWorkflowExecuteAdditionalData,
IDataObject, IDataObject,
INodeExecutionData, INodeExecutionData,
INodeParameters, INodeParameters,
@ -15,8 +13,10 @@ import type {
IExecuteData, IExecuteData,
INodeTypeDescription, INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
// import { Workflow } from 'n8n-workflow/Workflow';
// import { WorkflowDataProxy } from 'n8n-workflow/WorkflowDataProxy';
import * as a from 'node:assert'; import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm'; // import { runInNewContext, type Context } from 'node:vm';
import type { MainConfig } from '@/config/main-config'; import type { MainConfig } from '@/config/main-config';
import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types'; import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types';
@ -29,7 +29,7 @@ import { ExecutionError } from './errors/execution-error';
import { makeSerializable } from './errors/serializable-error'; import { makeSerializable } from './errors/serializable-error';
import type { RequireResolver } from './require-resolver'; import type { RequireResolver } from './require-resolver';
import { createRequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; // import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct'; import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct';
export interface JSExecSettings { export interface JSExecSettings {
@ -109,31 +109,33 @@ export class JsTaskRunner extends TaskRunner {
await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId); await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId);
const workflowParams = data.workflow; const workflowParams = data.workflow;
const workflow = new Workflow({
...workflowParams,
nodeTypes: this.nodeTypes,
});
const customConsole = { return {} as unknown as TaskResultData;
// Send log output back to the main process. It will take care of forwarding // const workflow = new Workflow({
// it to the UI or printing to console. // ...workflowParams,
log: (...args: unknown[]) => { // nodeTypes: this.nodeTypes,
const logOutput = args // });
.map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
.join(' ');
void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
},
};
const result = // const customConsole = {
settings.nodeMode === 'runOnceForAllItems' // // Send log output back to the main process. It will take care of forwarding
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole) // // it to the UI or printing to console.
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole); // log: (...args: unknown[]) => {
// const logOutput = args
// .map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg))
// .join(' ');
// void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]);
// },
// };
return { // const result =
result, // settings.nodeMode === 'runOnceForAllItems'
customData: data.runExecutionData.resultData.metadata, // ? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole)
}; // : await this.runForEachItem(task.taskId, settings, data, workflow, customConsole);
// return {
// result,
// customData: data.runExecutionData.resultData.metadata,
// };
} }
private getNativeVariables() { private getNativeVariables() {
@ -163,155 +165,153 @@ export class JsTaskRunner extends TaskRunner {
/** /**
* Executes the requested code for all items in a single run * Executes the requested code for all items in a single run
*/ */
private async runForAllItems( // private async runForAllItems(
taskId: string, // taskId: string,
settings: JSExecSettings, // settings: JSExecSettings,
data: JsTaskData, // data: JsTaskData,
workflow: Workflow, // workflow: Workflow,
customConsole: CustomConsole, // customConsole: CustomConsole,
): Promise<INodeExecutionData[]> { // ): Promise<INodeExecutionData[]> {
const dataProxy = this.createDataProxy(data, workflow, data.itemIndex); // const dataProxy = this.createDataProxy(data, workflow, data.itemIndex);
const inputItems = data.connectionInputData; // const inputItems = data.connectionInputData;
const context: Context = { // const context: Context = {
require: this.requireResolver, // require: this.requireResolver,
module: {}, // module: {},
console: customConsole, // console: customConsole,
items: inputItems, // items: inputItems,
...this.getNativeVariables(), // ...this.getNativeVariables(),
...dataProxy, // ...dataProxy,
...this.buildRpcCallObject(taskId), // ...this.buildRpcCallObject(taskId),
}; // };
try { // try {
const result = (await runInNewContext( // const result = (await runInNewContext(
`globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, // `globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context, // context,
)) as TaskResultData['result']; // )) as TaskResultData['result'];
if (result === null) { // if (result === null) {
return []; // return [];
} // }
return validateRunForAllItemsOutput(result); // // @ts-expect-error test
} catch (e) { // return validateRunForAllItemsOutput(result);
// Errors thrown by the VM are not instances of Error, so map them to an ExecutionError // } catch (e) {
const error = this.toExecutionErrorIfNeeded(e); // // Errors thrown by the VM are not instances of Error, so map them to an ExecutionError
// const error = this.toExecutionErrorIfNeeded(e);
if (settings.continueOnFail) { // if (settings.continueOnFail) {
return [{ json: { error: error.message } }]; // return [{ json: { error: error.message } }];
} // }
throw error; // throw error;
} // }
} // }
/** // /**
* Executes the requested code for each item in the input data // * Executes the requested code for each item in the input data
*/ // */
private async runForEachItem( // private async runForEachItem(
taskId: string, // taskId: string,
settings: JSExecSettings, // settings: JSExecSettings,
data: JsTaskData, // data: JsTaskData,
workflow: Workflow, // workflow: Workflow,
customConsole: CustomConsole, // customConsole: CustomConsole,
): Promise<INodeExecutionData[]> { // ): Promise<INodeExecutionData[]> {
const inputItems = data.connectionInputData; // const inputItems = data.connectionInputData;
const returnData: INodeExecutionData[] = []; // const returnData: INodeExecutionData[] = [];
for (let index = 0; index < inputItems.length; index++) { // for (let index = 0; index < inputItems.length; index++) {
const item = inputItems[index]; // const item = inputItems[index];
const dataProxy = this.createDataProxy(data, workflow, index); // const dataProxy = this.createDataProxy(data, workflow, index);
const context: Context = { // const context: Context = {
require: this.requireResolver, // require: this.requireResolver,
module: {}, // module: {},
console: customConsole, // console: customConsole,
item, // item,
...this.getNativeVariables(), // ...this.getNativeVariables(),
...dataProxy, // ...dataProxy,
...this.buildRpcCallObject(taskId), // ...this.buildRpcCallObject(taskId),
}; // };
try { // try {
let result = (await runInNewContext( // let result = (await runInNewContext(
`module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, // `module.exports = async function VmCodeWrapper() {${settings.code}\n}()`,
context, // context,
)) as INodeExecutionData | undefined; // )) as INodeExecutionData | undefined;
// Filter out null values // // Filter out null values
if (result === null) { // if (result === null) {
continue; // continue;
} // }
result = validateRunForEachItemOutput(result, index); // // @ts-expect-error test
if (result) { // result = validateRunForEachItemOutput(result, index);
returnData.push( // if (result) {
result.binary // returnData.push(
? { // result.binary
json: result.json, // ? {
pairedItem: { item: index }, // json: result.json,
binary: result.binary, // pairedItem: { item: index },
} // binary: result.binary,
: { // }
json: result.json, // : {
pairedItem: { item: index }, // json: result.json,
}, // pairedItem: { item: index },
); // },
} // );
} catch (e) { // }
// Errors thrown by the VM are not instances of Error, so map them to an ExecutionError // } catch (e) {
const error = this.toExecutionErrorIfNeeded(e); // // Errors thrown by the VM are not instances of Error, so map them to an ExecutionError
// const error = this.toExecutionErrorIfNeeded(e);
if (!settings.continueOnFail) { // if (!settings.continueOnFail) {
throw error; // throw error;
} // }
returnData.push({ // returnData.push({
json: { error: error.message }, // json: { error: error.message },
pairedItem: { // pairedItem: {
item: index, // item: index,
}, // },
}); // });
} // }
} // }
return returnData; // return returnData;
} // }
private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) { // private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) {
return new WorkflowDataProxy( // return new WorkflowDataProxy(
workflow, // workflow,
data.runExecutionData, // data.runExecutionData,
data.runIndex, // data.runIndex,
itemIndex, // itemIndex,
data.activeNodeName, // data.activeNodeName,
data.connectionInputData, // data.connectionInputData,
data.siblingParameters, // data.siblingParameters,
data.mode, // data.mode,
getAdditionalKeys( // {},
data.additionalData as IWorkflowExecuteAdditionalData, // data.executeData,
data.mode, // data.defaultReturnRunIndex,
data.runExecutionData, // data.selfData,
), // data.contextNodeName,
data.executeData, // // Make sure that even if we don't receive the envProviderState for
data.defaultReturnRunIndex, // // whatever reason, we don't expose the task runner's env to the code
data.selfData, // data.envProviderState ?? {
data.contextNodeName, // env: {},
// Make sure that even if we don't receive the envProviderState for // isEnvAccessBlocked: false,
// whatever reason, we don't expose the task runner's env to the code // isProcessAvailable: true,
data.envProviderState ?? { // },
env: {}, // // Because we optimize the needed data, it can be partially available.
isEnvAccessBlocked: false, // // We assign the available built-ins to the execution context, which
isProcessAvailable: true, // // means we run the getter for '$json', and by default $json throws
}, // // if there is no data available.
// Because we optimize the needed data, it can be partially available. // ).getDataProxy({ throwOnMissingExecutionData: false });
// We assign the available built-ins to the execution context, which // }
// means we run the getter for '$json', and by default $json throws
// if there is no data available.
).getDataProxy({ throwOnMissingExecutionData: false });
}
private toExecutionErrorIfNeeded(error: unknown): Error { private toExecutionErrorIfNeeded(error: unknown): Error {
if (error instanceof Error) { if (error instanceof Error) {

View file

@ -1,4 +1,4 @@
import { ApplicationError } from 'n8n-workflow'; // import { ApplicationError } from 'n8n-workflow';
import { isBuiltin } from 'node:module'; import { isBuiltin } from 'node:module';
import { ExecutionError } from './errors/execution-error'; import { ExecutionError } from './errors/execution-error';
@ -19,6 +19,8 @@ export type RequireResolverOpts = {
export type RequireResolver = (request: string) => unknown; export type RequireResolver = (request: string) => unknown;
const ApplicationError = Error;
export function createRequireResolver({ export function createRequireResolver({
allowedBuiltInModules, allowedBuiltInModules,
allowedExternalModules, allowedExternalModules,

View file

@ -1,4 +1,4 @@
import { normalizeItems } from 'n8n-core'; // import { normalizeItems } from 'n8n-core';
import type { INodeExecutionData } from 'n8n-workflow'; import type { INodeExecutionData } from 'n8n-workflow';
import { ValidationError } from './errors/validation-error'; import { ValidationError } from './errors/validation-error';
@ -71,9 +71,9 @@ export function validateRunForAllItemsOutput(
} }
} }
const returnData = normalizeItems(executionResult); // const returnData = normalizeItems(executionResult);
returnData.forEach(validateItem); // returnData.forEach(validateItem);
return returnData; return executionResult;
} }
/** /**
@ -103,14 +103,14 @@ export function validateRunForEachItemOutput(
}); });
} }
const [returnData] = normalizeItems([executionResult]); // const [returnData] = normalizeItems([executionResult]);
validateItem(returnData, itemIndex); // validateItem(returnData, itemIndex);
// If at least one top-level key is a supported item key (`json`, `binary`, etc.), // If at least one top-level key is a supported item key (`json`, `binary`, etc.),
// and another top-level key is unrecognized, then the user mis-added a property // and another top-level key is unrecognized, then the user mis-added a property
// directly on the item, when they intended to add it on the `json` property // directly on the item, when they intended to add it on the `json` property
validateTopLevelKeys(returnData, itemIndex); // validateTopLevelKeys(returnData, itemIndex);
return returnData; return [executionResult];
} }

View file

@ -1,4 +1,4 @@
import { ensureError } from 'n8n-workflow'; // import { ensureError } from 'n8n-workflow';
import Container from 'typedi'; import Container from 'typedi';
import { MainConfig } from './config/main-config'; import { MainConfig } from './config/main-config';
@ -7,6 +7,13 @@ import { JsTaskRunner } from './js-task-runner/js-task-runner';
let runner: JsTaskRunner | undefined; let runner: JsTaskRunner | undefined;
let isShuttingDown = false; let isShuttingDown = false;
const ensureError = (e: unknown) => {
if (e instanceof Error) {
return e;
}
return new Error(String(e));
};
function createSignalHandler(signal: string) { function createSignalHandler(signal: string) {
return async function onSignal() { return async function onSignal() {
if (isShuttingDown) { if (isShuttingDown) {

View file

@ -1,4 +1,3 @@
import { ApplicationError, ensureError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { type MessageEvent, WebSocket } from 'ws'; import { type MessageEvent, WebSocket } from 'ws';
@ -7,6 +6,15 @@ import type { BrokerMessage, RunnerMessage } from '@/message-types';
import { TaskRunnerNodeTypes } from '@/node-types'; import { TaskRunnerNodeTypes } from '@/node-types';
import { RPC_ALLOW_LIST, type TaskResultData } from '@/runner-types'; import { RPC_ALLOW_LIST, type TaskResultData } from '@/runner-types';
const ensureError = (e: unknown) => {
if (e instanceof Error) {
return e;
}
return new Error(String(e));
};
class ApplicationError extends Error {}
export interface Task<T = unknown> { export interface Task<T = unknown> {
taskId: string; taskId: string;
settings?: T; settings?: T;

View file

@ -11,6 +11,16 @@
"import": "./src/index.ts", "import": "./src/index.ts",
"types": "./dist/index.d.ts" "types": "./dist/index.d.ts"
}, },
"./Workflow": {
"import": "./src/Workflow.ts",
"require": "./dist/Workflow.js",
"types": "./dist/Workflow.d.ts"
},
"./WorkflowDataProxy": {
"import": "./src/WorkflowDataProxy.ts",
"require": "./dist/WorkflowDataProxy.js",
"types": "./dist/WorkflowDataProxy.d.ts"
},
"./*": "./*" "./*": "./*"
}, },
"scripts": { "scripts": {

View file

@ -4,7 +4,12 @@
"composite": true, "composite": true,
"rootDir": "src", "rootDir": "src",
"outDir": "dist", "outDir": "dist",
"tsBuildInfoFile": "dist/build.tsbuildinfo" "tsBuildInfoFile": "dist/build.tsbuildinfo",
"paths": {
"@/*": ["./*"],
"n8n-workflow/Workflow": ["./packages/n8n-workflow/src/Workflow"],
"n8n-workflow/WorkflowDataProxy": ["./packages/n8n-workflow/src/WorkflowDataProxy"]
}
}, },
"include": ["src/**/*.ts"], "include": ["src/**/*.ts"],
"exclude": ["test/**", "src/**/__tests__/**"] "exclude": ["test/**", "src/**/__tests__/**"]

View file

@ -4,7 +4,9 @@
"rootDir": ".", "rootDir": ".",
"baseUrl": "src", "baseUrl": "src",
"paths": { "paths": {
"@/*": ["./*"] "@/*": ["./*"],
"n8n-workflow/Workflow": ["./packages/n8n-workflow/src/Workflow"],
"n8n-workflow/WorkflowDataProxy": ["./packages/n8n-workflow/src/WorkflowDataProxy"]
}, },
"tsBuildInfoFile": "dist/typecheck.tsbuildinfo" "tsBuildInfoFile": "dist/typecheck.tsbuildinfo"
}, },