diff --git a/packages/@n8n/task-runner/package.json b/packages/@n8n/task-runner/package.json index 8350667099..6e080d6590 100644 --- a/packages/@n8n/task-runner/package.json +++ b/packages/@n8n/task-runner/package.json @@ -6,7 +6,7 @@ "start": "node dist/start.js", "dev": "pnpm build && pnpm start", "typecheck": "tsc --noEmit", - "build": "tsc -p ./tsconfig.build.json && tsc-alias -p tsconfig.build.json", + "build": "tsc -p ./tsconfig.build.json --noEmitOnError && tsc-alias -p tsconfig.build.json", "format": "biome format --write src", "format:check": "biome ci src", "test": "jest", diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts index 224f630807..5f12125c07 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts @@ -1,7 +1,23 @@ import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-workflow'; -import { NodeConnectionType } from 'n8n-workflow'; +// import { NodeConnectionType } from 'n8n-workflow'; import { nanoid } from 'nanoid'; +// eslint-disable-next-line no-restricted-syntax +enum NodeConnectionType { + AiAgent = 'ai_agent', + AiChain = 'ai_chain', + AiDocument = 'ai_document', + AiEmbedding = 'ai_embedding', + AiLanguageModel = 'ai_languageModel', + AiMemory = 'ai_memory', + AiOutputParser = 'ai_outputParser', + AiRetriever = 'ai_retriever', + AiTextSplitter = 'ai_textSplitter', + AiTool = 'ai_tool', + AiVectorStore = 'ai_vectorStore', + Main = 'main', +} + import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { DataRequestResponse } from '@/runner-types'; import type { Task } from '@/task-runner'; @@ -76,6 +92,7 @@ export const newDataRequestResponse = ( id: '1', name: 'Test Workflow', active: true, + // @ts-expect-error test connections: { [manualTriggerNode.name]: { main: [[{ node: codeNode.name, type: NodeConnectionType.Main, index: 0 }]], diff --git a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser.ts b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser.ts index dd2d849c6a..2ec39ced48 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser.ts @@ -1,8 +1,37 @@ import type { CallExpression, Identifier, Node, Program } from 'acorn'; import { parse } from 'acorn'; import { ancestor } from 'acorn-walk'; -import type { Result } from 'n8n-workflow'; -import { toResult } from 'n8n-workflow'; +// import type { Result } from 'n8n-workflow'; + +const ensureError = (e: unknown) => { + if (e instanceof Error) { + return e; + } + return new Error(String(e)); +}; + +export type ResultOk = { ok: true; result: T }; +export type ResultError = { ok: false; error: E }; +export type Result = ResultOk | ResultError; + +export const createResultOk = (data: T): ResultOk => ({ + ok: true, + result: data, +}); + +export const createResultError = (error: E): ResultError => ({ + ok: false, + error, +}); + +export const toResult = (fn: () => T): Result => { + try { + return createResultOk(fn()); + } catch (e) { + const error = ensureError(e); + return createResultError(error as E); + } +}; import { isAssignmentExpression, diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index c64d58636b..6e9fea457b 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -1,8 +1,6 @@ -import { getAdditionalKeys } from 'n8n-core'; -import { WorkflowDataProxy, Workflow } from 'n8n-workflow'; +// import { getAdditionalKeys } from 'n8n-core'; import type { CodeExecutionMode, - IWorkflowExecuteAdditionalData, IDataObject, INodeExecutionData, INodeParameters, @@ -15,8 +13,10 @@ import type { IExecuteData, INodeTypeDescription, } from 'n8n-workflow'; +// import { Workflow } from 'n8n-workflow/Workflow'; +// import { WorkflowDataProxy } from 'n8n-workflow/WorkflowDataProxy'; import * as a from 'node:assert'; -import { runInNewContext, type Context } from 'node:vm'; +// import { runInNewContext, type Context } from 'node:vm'; import type { MainConfig } from '@/config/main-config'; import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types'; @@ -29,7 +29,7 @@ import { ExecutionError } from './errors/execution-error'; import { makeSerializable } from './errors/serializable-error'; import type { RequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver'; -import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; +// import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct'; export interface JSExecSettings { @@ -109,31 +109,33 @@ export class JsTaskRunner extends TaskRunner { await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId); const workflowParams = data.workflow; - const workflow = new Workflow({ - ...workflowParams, - nodeTypes: this.nodeTypes, - }); - const customConsole = { - // Send log output back to the main process. It will take care of forwarding - // it to the UI or printing to console. - log: (...args: unknown[]) => { - const logOutput = args - .map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg)) - .join(' '); - void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]); - }, - }; + return {} as unknown as TaskResultData; + // const workflow = new Workflow({ + // ...workflowParams, + // nodeTypes: this.nodeTypes, + // }); - const result = - settings.nodeMode === 'runOnceForAllItems' - ? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole) - : await this.runForEachItem(task.taskId, settings, data, workflow, customConsole); + // const customConsole = { + // // Send log output back to the main process. It will take care of forwarding + // // it to the UI or printing to console. + // log: (...args: unknown[]) => { + // const logOutput = args + // .map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg)) + // .join(' '); + // void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]); + // }, + // }; - return { - result, - customData: data.runExecutionData.resultData.metadata, - }; + // const result = + // settings.nodeMode === 'runOnceForAllItems' + // ? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole) + // : await this.runForEachItem(task.taskId, settings, data, workflow, customConsole); + + // return { + // result, + // customData: data.runExecutionData.resultData.metadata, + // }; } private getNativeVariables() { @@ -163,155 +165,153 @@ export class JsTaskRunner extends TaskRunner { /** * Executes the requested code for all items in a single run */ - private async runForAllItems( - taskId: string, - settings: JSExecSettings, - data: JsTaskData, - workflow: Workflow, - customConsole: CustomConsole, - ): Promise { - const dataProxy = this.createDataProxy(data, workflow, data.itemIndex); - const inputItems = data.connectionInputData; + // private async runForAllItems( + // taskId: string, + // settings: JSExecSettings, + // data: JsTaskData, + // workflow: Workflow, + // customConsole: CustomConsole, + // ): Promise { + // const dataProxy = this.createDataProxy(data, workflow, data.itemIndex); + // const inputItems = data.connectionInputData; - const context: Context = { - require: this.requireResolver, - module: {}, - console: customConsole, - items: inputItems, + // const context: Context = { + // require: this.requireResolver, + // module: {}, + // console: customConsole, + // items: inputItems, - ...this.getNativeVariables(), - ...dataProxy, - ...this.buildRpcCallObject(taskId), - }; + // ...this.getNativeVariables(), + // ...dataProxy, + // ...this.buildRpcCallObject(taskId), + // }; - try { - const result = (await runInNewContext( - `globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, - context, - )) as TaskResultData['result']; + // try { + // const result = (await runInNewContext( + // `globalThis.global = globalThis; module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, + // context, + // )) as TaskResultData['result']; - if (result === null) { - return []; - } + // if (result === null) { + // return []; + // } - return validateRunForAllItemsOutput(result); - } catch (e) { - // Errors thrown by the VM are not instances of Error, so map them to an ExecutionError - const error = this.toExecutionErrorIfNeeded(e); + // // @ts-expect-error test + // return validateRunForAllItemsOutput(result); + // } catch (e) { + // // Errors thrown by the VM are not instances of Error, so map them to an ExecutionError + // const error = this.toExecutionErrorIfNeeded(e); - if (settings.continueOnFail) { - return [{ json: { error: error.message } }]; - } + // if (settings.continueOnFail) { + // return [{ json: { error: error.message } }]; + // } - throw error; - } - } + // throw error; + // } + // } - /** - * Executes the requested code for each item in the input data - */ - private async runForEachItem( - taskId: string, - settings: JSExecSettings, - data: JsTaskData, - workflow: Workflow, - customConsole: CustomConsole, - ): Promise { - const inputItems = data.connectionInputData; - const returnData: INodeExecutionData[] = []; + // /** + // * Executes the requested code for each item in the input data + // */ + // private async runForEachItem( + // taskId: string, + // settings: JSExecSettings, + // data: JsTaskData, + // workflow: Workflow, + // customConsole: CustomConsole, + // ): Promise { + // const inputItems = data.connectionInputData; + // const returnData: INodeExecutionData[] = []; - for (let index = 0; index < inputItems.length; index++) { - const item = inputItems[index]; - const dataProxy = this.createDataProxy(data, workflow, index); - const context: Context = { - require: this.requireResolver, - module: {}, - console: customConsole, - item, + // for (let index = 0; index < inputItems.length; index++) { + // const item = inputItems[index]; + // const dataProxy = this.createDataProxy(data, workflow, index); + // const context: Context = { + // require: this.requireResolver, + // module: {}, + // console: customConsole, + // item, - ...this.getNativeVariables(), - ...dataProxy, - ...this.buildRpcCallObject(taskId), - }; + // ...this.getNativeVariables(), + // ...dataProxy, + // ...this.buildRpcCallObject(taskId), + // }; - try { - let result = (await runInNewContext( - `module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, - context, - )) as INodeExecutionData | undefined; + // try { + // let result = (await runInNewContext( + // `module.exports = async function VmCodeWrapper() {${settings.code}\n}()`, + // context, + // )) as INodeExecutionData | undefined; - // Filter out null values - if (result === null) { - continue; - } + // // Filter out null values + // if (result === null) { + // continue; + // } - result = validateRunForEachItemOutput(result, index); - if (result) { - returnData.push( - result.binary - ? { - json: result.json, - pairedItem: { item: index }, - binary: result.binary, - } - : { - json: result.json, - pairedItem: { item: index }, - }, - ); - } - } catch (e) { - // Errors thrown by the VM are not instances of Error, so map them to an ExecutionError - const error = this.toExecutionErrorIfNeeded(e); + // // @ts-expect-error test + // result = validateRunForEachItemOutput(result, index); + // if (result) { + // returnData.push( + // result.binary + // ? { + // json: result.json, + // pairedItem: { item: index }, + // binary: result.binary, + // } + // : { + // json: result.json, + // pairedItem: { item: index }, + // }, + // ); + // } + // } catch (e) { + // // Errors thrown by the VM are not instances of Error, so map them to an ExecutionError + // const error = this.toExecutionErrorIfNeeded(e); - if (!settings.continueOnFail) { - throw error; - } + // if (!settings.continueOnFail) { + // throw error; + // } - returnData.push({ - json: { error: error.message }, - pairedItem: { - item: index, - }, - }); - } - } + // returnData.push({ + // json: { error: error.message }, + // pairedItem: { + // item: index, + // }, + // }); + // } + // } - return returnData; - } + // return returnData; + // } - private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) { - return new WorkflowDataProxy( - workflow, - data.runExecutionData, - data.runIndex, - itemIndex, - data.activeNodeName, - data.connectionInputData, - data.siblingParameters, - data.mode, - getAdditionalKeys( - data.additionalData as IWorkflowExecuteAdditionalData, - data.mode, - data.runExecutionData, - ), - data.executeData, - data.defaultReturnRunIndex, - data.selfData, - data.contextNodeName, - // Make sure that even if we don't receive the envProviderState for - // whatever reason, we don't expose the task runner's env to the code - data.envProviderState ?? { - env: {}, - isEnvAccessBlocked: false, - isProcessAvailable: true, - }, - // Because we optimize the needed data, it can be partially available. - // We assign the available built-ins to the execution context, which - // means we run the getter for '$json', and by default $json throws - // if there is no data available. - ).getDataProxy({ throwOnMissingExecutionData: false }); - } + // private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) { + // return new WorkflowDataProxy( + // workflow, + // data.runExecutionData, + // data.runIndex, + // itemIndex, + // data.activeNodeName, + // data.connectionInputData, + // data.siblingParameters, + // data.mode, + // {}, + // data.executeData, + // data.defaultReturnRunIndex, + // data.selfData, + // data.contextNodeName, + // // Make sure that even if we don't receive the envProviderState for + // // whatever reason, we don't expose the task runner's env to the code + // data.envProviderState ?? { + // env: {}, + // isEnvAccessBlocked: false, + // isProcessAvailable: true, + // }, + // // Because we optimize the needed data, it can be partially available. + // // We assign the available built-ins to the execution context, which + // // means we run the getter for '$json', and by default $json throws + // // if there is no data available. + // ).getDataProxy({ throwOnMissingExecutionData: false }); + // } private toExecutionErrorIfNeeded(error: unknown): Error { if (error instanceof Error) { diff --git a/packages/@n8n/task-runner/src/js-task-runner/require-resolver.ts b/packages/@n8n/task-runner/src/js-task-runner/require-resolver.ts index ffa00c0441..44c02484c2 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/require-resolver.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/require-resolver.ts @@ -1,4 +1,4 @@ -import { ApplicationError } from 'n8n-workflow'; +// import { ApplicationError } from 'n8n-workflow'; import { isBuiltin } from 'node:module'; import { ExecutionError } from './errors/execution-error'; @@ -19,6 +19,8 @@ export type RequireResolverOpts = { export type RequireResolver = (request: string) => unknown; +const ApplicationError = Error; + export function createRequireResolver({ allowedBuiltInModules, allowedExternalModules, diff --git a/packages/@n8n/task-runner/src/js-task-runner/result-validation.ts b/packages/@n8n/task-runner/src/js-task-runner/result-validation.ts index b7d0ffc5fc..44313a3a52 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/result-validation.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/result-validation.ts @@ -1,4 +1,4 @@ -import { normalizeItems } from 'n8n-core'; +// import { normalizeItems } from 'n8n-core'; import type { INodeExecutionData } from 'n8n-workflow'; import { ValidationError } from './errors/validation-error'; @@ -71,9 +71,9 @@ export function validateRunForAllItemsOutput( } } - const returnData = normalizeItems(executionResult); - returnData.forEach(validateItem); - return returnData; + // const returnData = normalizeItems(executionResult); + // returnData.forEach(validateItem); + return executionResult; } /** @@ -103,14 +103,14 @@ export function validateRunForEachItemOutput( }); } - const [returnData] = normalizeItems([executionResult]); + // const [returnData] = normalizeItems([executionResult]); - validateItem(returnData, itemIndex); + // validateItem(returnData, itemIndex); // If at least one top-level key is a supported item key (`json`, `binary`, etc.), // and another top-level key is unrecognized, then the user mis-added a property // directly on the item, when they intended to add it on the `json` property - validateTopLevelKeys(returnData, itemIndex); + // validateTopLevelKeys(returnData, itemIndex); - return returnData; + return [executionResult]; } diff --git a/packages/@n8n/task-runner/src/start.ts b/packages/@n8n/task-runner/src/start.ts index fcaab84d51..be4282e04c 100644 --- a/packages/@n8n/task-runner/src/start.ts +++ b/packages/@n8n/task-runner/src/start.ts @@ -1,4 +1,4 @@ -import { ensureError } from 'n8n-workflow'; +// import { ensureError } from 'n8n-workflow'; import Container from 'typedi'; import { MainConfig } from './config/main-config'; @@ -7,6 +7,13 @@ import { JsTaskRunner } from './js-task-runner/js-task-runner'; let runner: JsTaskRunner | undefined; let isShuttingDown = false; +const ensureError = (e: unknown) => { + if (e instanceof Error) { + return e; + } + return new Error(String(e)); +}; + function createSignalHandler(signal: string) { return async function onSignal() { if (isShuttingDown) { diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index d1bf8a6d71..8462b4ad82 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -1,4 +1,3 @@ -import { ApplicationError, ensureError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; import { type MessageEvent, WebSocket } from 'ws'; @@ -7,6 +6,15 @@ import type { BrokerMessage, RunnerMessage } from '@/message-types'; import { TaskRunnerNodeTypes } from '@/node-types'; import { RPC_ALLOW_LIST, type TaskResultData } from '@/runner-types'; +const ensureError = (e: unknown) => { + if (e instanceof Error) { + return e; + } + return new Error(String(e)); +}; + +class ApplicationError extends Error {} + export interface Task { taskId: string; settings?: T; diff --git a/packages/workflow/package.json b/packages/workflow/package.json index bbb75b7540..f6a568555b 100644 --- a/packages/workflow/package.json +++ b/packages/workflow/package.json @@ -11,6 +11,16 @@ "import": "./src/index.ts", "types": "./dist/index.d.ts" }, + "./Workflow": { + "import": "./src/Workflow.ts", + "require": "./dist/Workflow.js", + "types": "./dist/Workflow.d.ts" + }, + "./WorkflowDataProxy": { + "import": "./src/WorkflowDataProxy.ts", + "require": "./dist/WorkflowDataProxy.js", + "types": "./dist/WorkflowDataProxy.d.ts" + }, "./*": "./*" }, "scripts": { diff --git a/packages/workflow/tsconfig.build.json b/packages/workflow/tsconfig.build.json index 6aa59ada37..3fb1aeee2b 100644 --- a/packages/workflow/tsconfig.build.json +++ b/packages/workflow/tsconfig.build.json @@ -4,7 +4,12 @@ "composite": true, "rootDir": "src", "outDir": "dist", - "tsBuildInfoFile": "dist/build.tsbuildinfo" + "tsBuildInfoFile": "dist/build.tsbuildinfo", + "paths": { + "@/*": ["./*"], + "n8n-workflow/Workflow": ["./packages/n8n-workflow/src/Workflow"], + "n8n-workflow/WorkflowDataProxy": ["./packages/n8n-workflow/src/WorkflowDataProxy"] + } }, "include": ["src/**/*.ts"], "exclude": ["test/**", "src/**/__tests__/**"] diff --git a/packages/workflow/tsconfig.json b/packages/workflow/tsconfig.json index 2f0507b565..9cd77b8dc6 100644 --- a/packages/workflow/tsconfig.json +++ b/packages/workflow/tsconfig.json @@ -4,7 +4,9 @@ "rootDir": ".", "baseUrl": "src", "paths": { - "@/*": ["./*"] + "@/*": ["./*"], + "n8n-workflow/Workflow": ["./packages/n8n-workflow/src/Workflow"], + "n8n-workflow/WorkflowDataProxy": ["./packages/n8n-workflow/src/WorkflowDataProxy"] }, "tsBuildInfoFile": "dist/typecheck.tsbuildinfo" },