From 2bb1996738abfc9fc9ac6ff61f96c99e75b69619 Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Mon, 7 Oct 2024 21:18:32 +0300 Subject: [PATCH] feat: Add once for each item support for JS task runner (no-changelog) (#11109) --- .../InformationExtractor.node.ts | 2 +- .../nodes-langchain/nodes/code/Code.node.ts | 4 +- .../OutputParserStructured.node.ts | 2 +- .../nodes/tools/ToolCode/ToolCode.node.ts | 8 +- .../tools/ToolWorkflow/ToolWorkflow.node.ts | 2 +- .../nodes-langchain/utils/schemaParsing.ts | 1 - packages/@n8n/task-runner/package.json | 7 +- .../task-runner/src/__tests__/code.test.ts | 319 ++++++++++++++++++ .../task-runner/src/__tests__/test-data.ts | 148 ++++++++ packages/@n8n/task-runner/src/code.ts | 218 +++++++++--- .../@n8n/task-runner/src/execution-error.ts | 84 +++++ packages/@n8n/task-runner/src/obj-utils.ts | 5 + .../@n8n/task-runner/src/result-validation.ts | 116 +++++++ packages/@n8n/task-runner/src/task-runner.ts | 7 +- .../@n8n/task-runner/src/validation-error.ts | 44 +++ .../nodes/AiTransform/AiTransform.node.ts | 2 +- packages/nodes-base/nodes/Code/Code.node.ts | 25 +- .../nodes-base/nodes/Code/ExecutionError.ts | 4 +- .../nodes/Code/JavaScriptSandbox.ts | 52 +-- .../nodes-base/nodes/Code/JsCodeValidator.ts | 54 +++ .../nodes/Code/JsTaskRunnerSandbox.ts | 92 +++++ .../nodes-base/nodes/Code/PythonSandbox.ts | 10 +- packages/nodes-base/nodes/Code/Sandbox.ts | 40 +-- 23 files changed, 1104 insertions(+), 142 deletions(-) create mode 100644 packages/@n8n/task-runner/src/__tests__/code.test.ts create mode 100644 packages/@n8n/task-runner/src/__tests__/test-data.ts create mode 100644 packages/@n8n/task-runner/src/execution-error.ts create mode 100644 packages/@n8n/task-runner/src/obj-utils.ts create mode 100644 packages/@n8n/task-runner/src/result-validation.ts create mode 100644 packages/@n8n/task-runner/src/validation-error.ts create mode 100644 packages/nodes-base/nodes/Code/JsCodeValidator.ts create mode 100644 packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts diff --git a/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts b/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts index 0399fc2d5a..7ccfddc5e4 100644 --- a/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/chains/InformationExtractor/InformationExtractor.node.ts @@ -262,7 +262,7 @@ export class InformationExtractor implements INodeType { } const zodSchemaSandbox = getSandboxWithZod(this, jsonSchema, 0); - const zodSchema = (await zodSchemaSandbox.runCode()) as z.ZodSchema; + const zodSchema = await zodSchemaSandbox.runCode>(); parser = OutputFixingParser.fromLLM(llm, StructuredOutputParser.fromZodSchema(zodSchema)); } diff --git a/packages/@n8n/nodes-langchain/nodes/code/Code.node.ts b/packages/@n8n/nodes-langchain/nodes/code/Code.node.ts index 085dec98c2..abe9b01530 100644 --- a/packages/@n8n/nodes-langchain/nodes/code/Code.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/code/Code.node.ts @@ -107,7 +107,7 @@ function getSandbox( } // eslint-disable-next-line @typescript-eslint/unbound-method - const sandbox = new JavaScriptSandbox(context, code, itemIndex, this.helpers, { + const sandbox = new JavaScriptSandbox(context, code, this.helpers, { resolver: vmResolver, }); @@ -368,7 +368,7 @@ export class Code implements INodeType { } const sandbox = getSandbox.call(this, code.supplyData.code, { itemIndex }); - const response = (await sandbox.runCode()) as Tool; + const response = await sandbox.runCode(); return { response: logWrapper(response, this), diff --git a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts index 027ce04e6e..6ce6bff76b 100644 --- a/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/output_parser/OutputParserStructured/OutputParserStructured.node.ts @@ -48,7 +48,7 @@ export class N8nStructuredOutputParser extends Structure sandboxedSchema: JavaScriptSandbox, nodeVersion: number, ): Promise>> { - const zodSchema = (await sandboxedSchema.runCode()) as z.ZodSchema; + const zodSchema = await sandboxedSchema.runCode>(); let returnSchema: z.ZodSchema; if (nodeVersion === 1) { diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolCode/ToolCode.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolCode/ToolCode.node.ts index df68fb0c6a..7980f5fa9d 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolCode/ToolCode.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolCode/ToolCode.node.ts @@ -199,9 +199,9 @@ export class ToolCode implements INodeType { let sandbox: Sandbox; if (language === 'javaScript') { - sandbox = new JavaScriptSandbox(context, code, index, this.helpers); + sandbox = new JavaScriptSandbox(context, code, this.helpers); } else { - sandbox = new PythonSandbox(context, code, index, this.helpers); + sandbox = new PythonSandbox(context, code, this.helpers); } sandbox.on( @@ -216,7 +216,7 @@ export class ToolCode implements INodeType { const runFunction = async (query: string | IDataObject): Promise => { const sandbox = getSandbox(query, itemIndex); - return await (sandbox.runCode() as Promise); + return await sandbox.runCode(); }; const toolHandler = async (query: string | IDataObject): Promise => { @@ -274,7 +274,7 @@ export class ToolCode implements INodeType { : jsonParse(inputSchema); const zodSchemaSandbox = getSandboxWithZod(this, jsonSchema, 0); - const zodSchema = (await zodSchemaSandbox.runCode()) as DynamicZodObject; + const zodSchema = await zodSchemaSandbox.runCode(); tool = new DynamicStructuredTool({ schema: zodSchema, diff --git a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts index 0b00e17ac4..352a727d11 100644 --- a/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts +++ b/packages/@n8n/nodes-langchain/nodes/tools/ToolWorkflow/ToolWorkflow.node.ts @@ -530,7 +530,7 @@ export class ToolWorkflow implements INodeType { : jsonParse(inputSchema); const zodSchemaSandbox = getSandboxWithZod(this, jsonSchema, 0); - const zodSchema = (await zodSchemaSandbox.runCode()) as DynamicZodObject; + const zodSchema = await zodSchemaSandbox.runCode(); tool = new DynamicStructuredTool({ schema: zodSchema, diff --git a/packages/@n8n/nodes-langchain/utils/schemaParsing.ts b/packages/@n8n/nodes-langchain/utils/schemaParsing.ts index 8d5f61153d..0591483e2c 100644 --- a/packages/@n8n/nodes-langchain/utils/schemaParsing.ts +++ b/packages/@n8n/nodes-langchain/utils/schemaParsing.ts @@ -57,7 +57,6 @@ export function getSandboxWithZod(ctx: IExecuteFunctions, schema: JSONSchema7, i const itemSchema = new Function('z', 'return (' + zodSchema + ')')(z) return itemSchema `, - itemIndex, ctx.helpers, { resolver: vmResolver }, ); diff --git a/packages/@n8n/task-runner/package.json b/packages/@n8n/task-runner/package.json index 2ce5993bb9..af5e650fce 100644 --- a/packages/@n8n/task-runner/package.json +++ b/packages/@n8n/task-runner/package.json @@ -6,13 +6,14 @@ "start": "node dist/start.js", "dev": "pnpm build && pnpm start", "typecheck": "tsc --noEmit", - "build": "tsc -p ./tsconfig.build.json", + "build": "tsc -p ./tsconfig.build.json && tsc-alias -p tsconfig.build.json", "format": "biome format --write src", "format:check": "biome ci src", - "test": "echo \"Error: no tests in this package\" && exit 0", + "test": "jest", + "test:watch": "jest --watch", "lint": "eslint . --quiet", "lintfix": "eslint . --fix", - "watch": "tsc -p tsconfig.build.json --watch" + "watch": "concurrently \"tsc -w -p tsconfig.build.json\" \"tsc-alias -w -p tsconfig.build.json\"" }, "main": "dist/start.js", "module": "src/start.ts", diff --git a/packages/@n8n/task-runner/src/__tests__/code.test.ts b/packages/@n8n/task-runner/src/__tests__/code.test.ts new file mode 100644 index 0000000000..8198627374 --- /dev/null +++ b/packages/@n8n/task-runner/src/__tests__/code.test.ts @@ -0,0 +1,319 @@ +import type { CodeExecutionMode, IDataObject, WorkflowExecuteMode } from 'n8n-workflow'; + +import { JsTaskRunner, type AllCodeTaskData, type JSExecSettings } from '@/code'; +import type { Task } from '@/task-runner'; +import { ValidationError } from '@/validation-error'; + +import { newAllCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data'; + +jest.mock('ws'); + +describe('JsTaskRunner', () => { + const jsTaskRunner = new JsTaskRunner('taskType', 'ws://localhost', 'grantToken', 1); + + const execTaskWithParams = async ({ + task, + taskData, + }: { + task: Task; + taskData: AllCodeTaskData; + }) => { + jest.spyOn(jsTaskRunner, 'requestData').mockResolvedValue(taskData); + return await jsTaskRunner.executeTask(task); + }; + + afterEach(() => { + jest.restoreAllMocks(); + }); + + describe('console', () => { + test.each<[CodeExecutionMode, WorkflowExecuteMode]>([ + ['runOnceForAllItems', 'cli'], + ['runOnceForAllItems', 'error'], + ['runOnceForAllItems', 'integrated'], + ['runOnceForAllItems', 'internal'], + ['runOnceForAllItems', 'retry'], + ['runOnceForAllItems', 'trigger'], + ['runOnceForAllItems', 'webhook'], + ['runOnceForEachItem', 'cli'], + ['runOnceForEachItem', 'error'], + ['runOnceForEachItem', 'integrated'], + ['runOnceForEachItem', 'internal'], + ['runOnceForEachItem', 'retry'], + ['runOnceForEachItem', 'trigger'], + ['runOnceForEachItem', 'webhook'], + ])( + 'should make an rpc call for console log in %s mode when workflow mode is %s', + async (nodeMode, workflowMode) => { + jest.spyOn(console, 'log').mockImplementation(() => {}); + jest.spyOn(jsTaskRunner, 'makeRpcCall').mockResolvedValue(undefined); + const task = newTaskWithSettings({ + code: "console.log('Hello', 'world!'); return {}", + nodeMode, + workflowMode, + }); + + await execTaskWithParams({ + task, + taskData: newAllCodeTaskData([wrapIntoJson({})]), + }); + + expect(console.log).toHaveBeenCalledWith('[JS Code]', 'Hello world!'); + expect(jsTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [ + 'Hello world!', + ]); + }, + ); + + test.each<[CodeExecutionMode, WorkflowExecuteMode]>([ + ['runOnceForAllItems', 'manual'], + ['runOnceForEachItem', 'manual'], + ])( + "shouldn't make an rpc call for console log in %s mode when workflow mode is %s", + async (nodeMode, workflowMode) => { + jest.spyOn(jsTaskRunner, 'makeRpcCall').mockResolvedValue(undefined); + const task = newTaskWithSettings({ + code: "console.log('Hello', 'world!'); return {}", + nodeMode, + workflowMode, + }); + + await execTaskWithParams({ + task, + taskData: newAllCodeTaskData([wrapIntoJson({})]), + }); + + expect(jsTaskRunner.makeRpcCall).not.toHaveBeenCalled(); + }, + ); + }); + + describe('runOnceForAllItems', () => { + const executeForAllItems = async ({ + code, + inputItems, + settings, + }: { code: string; inputItems: IDataObject[]; settings?: Partial }) => { + return await execTaskWithParams({ + task: newTaskWithSettings({ + code, + nodeMode: 'runOnceForAllItems', + ...settings, + }), + taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)), + }); + }; + + describe('continue on fail', () => { + it('should return an item with the error if continueOnFail is true', async () => { + const outcome = await executeForAllItems({ + code: 'throw new Error("Error message")', + inputItems: [{ a: 1 }], + settings: { continueOnFail: true }, + }); + + expect(outcome).toEqual({ + result: [wrapIntoJson({ error: 'Error message' })], + customData: undefined, + }); + }); + + it('should throw an error if continueOnFail is false', async () => { + await expect( + executeForAllItems({ + code: 'throw new Error("Error message")', + inputItems: [{ a: 1 }], + settings: { continueOnFail: false }, + }), + ).rejects.toThrow('Error message'); + }); + }); + + describe('invalid output', () => { + test.each([['undefined'], ['42'], ['"a string"']])( + 'should throw a ValidationError if the code output is %s', + async (output) => { + await expect( + executeForAllItems({ + code: `return ${output}`, + inputItems: [{ a: 1 }], + }), + ).rejects.toThrow(ValidationError); + }, + ); + + it('should throw a ValidationError if some items are wrapped in json and some are not', async () => { + await expect( + executeForAllItems({ + code: 'return [{b: 1}, {json: {b: 2}}]', + inputItems: [{ a: 1 }], + }), + ).rejects.toThrow(ValidationError); + }); + }); + + it('should return static items', async () => { + const outcome = await executeForAllItems({ + code: 'return [{json: {b: 1}}]', + inputItems: [{ a: 1 }], + }); + + expect(outcome).toEqual({ + result: [wrapIntoJson({ b: 1 })], + customData: undefined, + }); + }); + + it('maps null into an empty array', async () => { + const outcome = await executeForAllItems({ + code: 'return null', + inputItems: [{ a: 1 }], + }); + + expect(outcome).toEqual({ + result: [], + customData: undefined, + }); + }); + + it("should wrap items into json if they aren't", async () => { + const outcome = await executeForAllItems({ + code: 'return [{b: 1}]', + inputItems: [{ a: 1 }], + }); + + expect(outcome).toEqual({ + result: [wrapIntoJson({ b: 1 })], + customData: undefined, + }); + }); + + it('should wrap single item into an array and json', async () => { + const outcome = await executeForAllItems({ + code: 'return {b: 1}', + inputItems: [{ a: 1 }], + }); + + expect(outcome).toEqual({ + result: [wrapIntoJson({ b: 1 })], + customData: undefined, + }); + }); + + test.each([['items'], ['$input.all()'], ["$('Trigger').all()"]])( + 'should have all input items in the context as %s', + async (expression) => { + const outcome = await executeForAllItems({ + code: `return ${expression}`, + inputItems: [{ a: 1 }, { a: 2 }], + }); + + expect(outcome).toEqual({ + result: [wrapIntoJson({ a: 1 }), wrapIntoJson({ a: 2 })], + customData: undefined, + }); + }, + ); + }); + + describe('runForEachItem', () => { + const executeForEachItem = async ({ + code, + inputItems, + settings, + }: { code: string; inputItems: IDataObject[]; settings?: Partial }) => { + return await execTaskWithParams({ + task: newTaskWithSettings({ + code, + nodeMode: 'runOnceForEachItem', + ...settings, + }), + taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)), + }); + }; + + describe('continue on fail', () => { + it('should return an item with the error if continueOnFail is true', async () => { + const outcome = await executeForEachItem({ + code: 'throw new Error("Error message")', + inputItems: [{ a: 1 }, { a: 2 }], + settings: { continueOnFail: true }, + }); + + expect(outcome).toEqual({ + result: [ + withPairedItem(0, wrapIntoJson({ error: 'Error message' })), + withPairedItem(1, wrapIntoJson({ error: 'Error message' })), + ], + customData: undefined, + }); + }); + + it('should throw an error if continueOnFail is false', async () => { + await expect( + executeForEachItem({ + code: 'throw new Error("Error message")', + inputItems: [{ a: 1 }], + settings: { continueOnFail: false }, + }), + ).rejects.toThrow('Error message'); + }); + }); + + describe('invalid output', () => { + test.each([['undefined'], ['42'], ['"a string"'], ['[]'], ['[1,2,3]']])( + 'should throw a ValidationError if the code output is %s', + async (output) => { + await expect( + executeForEachItem({ + code: `return ${output}`, + inputItems: [{ a: 1 }], + }), + ).rejects.toThrow(ValidationError); + }, + ); + }); + + it('should return static items', async () => { + const outcome = await executeForEachItem({ + code: 'return {json: {b: 1}}', + inputItems: [{ a: 1 }], + }); + + expect(outcome).toEqual({ + result: [withPairedItem(0, wrapIntoJson({ b: 1 }))], + customData: undefined, + }); + }); + + it('should filter out null values', async () => { + const outcome = await executeForEachItem({ + code: 'return item.json.a === 1 ? item : null', + inputItems: [{ a: 1 }, { a: 2 }, { a: 3 }], + }); + + expect(outcome).toEqual({ + result: [withPairedItem(0, wrapIntoJson({ a: 1 }))], + customData: undefined, + }); + }); + + test.each([['item'], ['$input.item'], ['{ json: $json }']])( + 'should have the current input item in the context as %s', + async (expression) => { + const outcome = await executeForEachItem({ + code: `return ${expression}`, + inputItems: [{ a: 1 }, { a: 2 }], + }); + + expect(outcome).toEqual({ + result: [ + withPairedItem(0, wrapIntoJson({ a: 1 })), + withPairedItem(1, wrapIntoJson({ a: 2 })), + ], + customData: undefined, + }); + }, + ); + }); +}); diff --git a/packages/@n8n/task-runner/src/__tests__/test-data.ts b/packages/@n8n/task-runner/src/__tests__/test-data.ts new file mode 100644 index 0000000000..a7d6af2d46 --- /dev/null +++ b/packages/@n8n/task-runner/src/__tests__/test-data.ts @@ -0,0 +1,148 @@ +import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-workflow'; +import { NodeConnectionType } from 'n8n-workflow'; +import { nanoid } from 'nanoid'; + +import type { AllCodeTaskData, JSExecSettings } from '@/code'; +import type { Task } from '@/task-runner'; + +/** + * Creates a new task with the given settings + */ +export const newTaskWithSettings = ( + settings: Partial & Pick, +): Task => ({ + taskId: '1', + settings: { + workflowMode: 'manual', + continueOnFail: false, + mode: 'manual', + ...settings, + }, + active: true, + cancelled: false, +}); + +/** + * Creates a new node with the given options + */ +export const newNode = (opts: Partial = {}): INode => ({ + id: nanoid(), + name: 'Test Node' + nanoid(), + parameters: {}, + position: [0, 0], + type: 'n8n-nodes-base.code', + typeVersion: 1, + ...opts, +}); + +/** + * Creates a new task data with the given options + */ +export const newTaskData = (opts: Partial & Pick): ITaskData => ({ + startTime: Date.now(), + executionTime: 0, + executionStatus: 'success', + ...opts, +}); + +/** + * Creates a new all code task data with the given options + */ +export const newAllCodeTaskData = ( + codeNodeInputData: INodeExecutionData[], + opts: Partial = {}, +): AllCodeTaskData => { + const codeNode = newNode({ + name: 'JsCode', + parameters: { + mode: 'runOnceForEachItem', + language: 'javaScript', + jsCode: 'return item', + }, + type: 'n8n-nodes-base.code', + typeVersion: 2, + }); + const manualTriggerNode = newNode({ + name: 'Trigger', + type: 'n8n-nodes-base.manualTrigger', + }); + + return { + workflow: { + id: '1', + name: 'Test Workflow', + active: true, + connections: { + [manualTriggerNode.name]: { + main: [[{ node: codeNode.name, type: NodeConnectionType.Main, index: 0 }]], + }, + }, + nodes: [manualTriggerNode, codeNode], + }, + inputData: { + main: [codeNodeInputData], + }, + connectionInputData: codeNodeInputData, + node: codeNode, + runExecutionData: { + startData: {}, + resultData: { + runData: { + [manualTriggerNode.name]: [ + newTaskData({ + source: [], + data: { + main: [codeNodeInputData], + }, + }), + ], + }, + pinData: {}, + lastNodeExecuted: manualTriggerNode.name, + }, + executionData: { + contextData: {}, + nodeExecutionStack: [], + metadata: {}, + waitingExecution: {}, + waitingExecutionSource: {}, + }, + }, + runIndex: 0, + itemIndex: 0, + activeNodeName: codeNode.name, + contextNodeName: codeNode.name, + defaultReturnRunIndex: -1, + siblingParameters: {}, + mode: 'manual', + selfData: {}, + additionalData: { + formWaitingBaseUrl: '', + instanceBaseUrl: '', + restartExecutionId: '', + restApiUrl: '', + webhookBaseUrl: '', + webhookTestBaseUrl: '', + webhookWaitingBaseUrl: '', + variables: {}, + }, + ...opts, + }; +}; + +/** + * Wraps the given value into an INodeExecutionData object's json property + */ +export const wrapIntoJson = (json: IDataObject): INodeExecutionData => ({ + json, +}); + +/** + * Adds the given index as the pairedItem property to the given INodeExecutionData object + */ +export const withPairedItem = (index: number, data: INodeExecutionData): INodeExecutionData => ({ + ...data, + pairedItem: { + item: index, + }, +}); diff --git a/packages/@n8n/task-runner/src/code.ts b/packages/@n8n/task-runner/src/code.ts index 6fcb6cf878..387558c409 100644 --- a/packages/@n8n/task-runner/src/code.ts +++ b/packages/@n8n/task-runner/src/code.ts @@ -1,28 +1,36 @@ import { getAdditionalKeys } from 'n8n-core'; import { - type INode, - type INodeType, - type ITaskDataConnections, - type IWorkflowExecuteAdditionalData, WorkflowDataProxy, - type WorkflowParameters, - type IDataObject, - type IExecuteData, - type INodeExecutionData, - type INodeParameters, - type IRunExecutionData, // type IWorkflowDataProxyAdditionalKeys, Workflow, - type WorkflowExecuteMode, +} from 'n8n-workflow'; +import type { + CodeExecutionMode, + INode, + INodeType, + ITaskDataConnections, + IWorkflowExecuteAdditionalData, + WorkflowParameters, + IDataObject, + IExecuteData, + INodeExecutionData, + INodeParameters, + IRunExecutionData, + WorkflowExecuteMode, } from 'n8n-workflow'; import * as a from 'node:assert'; import { runInNewContext, type Context } from 'node:vm'; +import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from '@/result-validation'; + import type { TaskResultData } from './runner-types'; import { type Task, TaskRunner } from './task-runner'; -interface JSExecSettings { +export interface JSExecSettings { code: string; + nodeMode: CodeExecutionMode; + workflowMode: WorkflowExecuteMode; + continueOnFail: boolean; // For workflow data proxy mode: WorkflowExecuteMode; @@ -62,6 +70,12 @@ export interface AllCodeTaskData { additionalData: PartialAdditionalData; } +type CustomConsole = { + log: (...args: unknown[]) => void; +}; + +const noop = () => {}; + export class JsTaskRunner extends TaskRunner { constructor( taskType: string, @@ -95,15 +109,154 @@ export class JsTaskRunner extends TaskRunner { }, }); - const dataProxy = new WorkflowDataProxy( + const customConsole = { + log: + settings.workflowMode === 'manual' + ? noop + : (...args: unknown[]) => { + const logOutput = args + .map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg)) + .join(' '); + console.log('[JS Code]', logOutput); + void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]); + }, + }; + + const result = + settings.nodeMode === 'runOnceForAllItems' + ? await this.runForAllItems(task.taskId, settings, allData, workflow, customConsole) + : await this.runForEachItem(task.taskId, settings, allData, workflow, customConsole); + + return { + result, + customData: allData.runExecutionData.resultData.metadata, + }; + } + + /** + * Executes the requested code for all items in a single run + */ + private async runForAllItems( + taskId: string, + settings: JSExecSettings, + allData: AllCodeTaskData, + workflow: Workflow, + customConsole: CustomConsole, + ): Promise { + const dataProxy = this.createDataProxy(allData, workflow, allData.itemIndex); + const inputItems = allData.connectionInputData; + + const context: Context = { + require, + module: {}, + console: customConsole, + + items: inputItems, + ...dataProxy, + ...this.buildRpcCallObject(taskId), + }; + + try { + const result = (await runInNewContext( + `module.exports = async function() {${settings.code}\n}()`, + context, + )) as TaskResultData['result']; + + if (result === null) { + return []; + } + + return validateRunForAllItemsOutput(result); + } catch (error) { + if (settings.continueOnFail) { + return [{ json: { error: this.getErrorMessageFromVmError(error) } }]; + } + + (error as Record).node = allData.node; + throw error; + } + } + + /** + * Executes the requested code for each item in the input data + */ + private async runForEachItem( + taskId: string, + settings: JSExecSettings, + allData: AllCodeTaskData, + workflow: Workflow, + customConsole: CustomConsole, + ): Promise { + const inputItems = allData.connectionInputData; + const returnData: INodeExecutionData[] = []; + + for (let index = 0; index < inputItems.length; index++) { + const item = inputItems[index]; + const dataProxy = this.createDataProxy(allData, workflow, index); + const context: Context = { + require, + module: {}, + console: customConsole, + item, + + ...dataProxy, + ...this.buildRpcCallObject(taskId), + }; + + try { + let result = (await runInNewContext( + `module.exports = async function() {${settings.code}\n}()`, + context, + )) as INodeExecutionData | undefined; + + // Filter out null values + if (result === null) { + continue; + } + + result = validateRunForEachItemOutput(result, index); + if (result) { + returnData.push( + result.binary + ? { + json: result.json, + pairedItem: { item: index }, + binary: result.binary, + } + : { + json: result.json, + pairedItem: { item: index }, + }, + ); + } + } catch (error) { + if (!settings.continueOnFail) { + (error as Record).node = allData.node; + throw error; + } + + returnData.push({ + json: { error: this.getErrorMessageFromVmError(error) }, + pairedItem: { + item: index, + }, + }); + } + } + + return returnData; + } + + private createDataProxy(allData: AllCodeTaskData, workflow: Workflow, itemIndex: number) { + return new WorkflowDataProxy( workflow, allData.runExecutionData, allData.runIndex, - allData.itemIndex, + itemIndex, allData.activeNodeName, allData.connectionInputData, allData.siblingParameters, - settings.mode, + allData.mode, getAdditionalKeys( allData.additionalData as IWorkflowExecuteAdditionalData, allData.mode, @@ -113,35 +266,14 @@ export class JsTaskRunner extends TaskRunner { allData.defaultReturnRunIndex, allData.selfData, allData.contextNodeName, - ); + ).getDataProxy(); + } - const customConsole = { - log: (...args: unknown[]) => { - const logOutput = args - .map((arg) => (typeof arg === 'object' && arg !== null ? JSON.stringify(arg) : arg)) - .join(' '); - console.log('[JS Code]', logOutput); - void this.makeRpcCall(task.taskId, 'logNodeOutput', [logOutput]); - }, - }; + private getErrorMessageFromVmError(error: unknown): string { + if (typeof error === 'object' && !!error && 'message' in error) { + return error.message as string; + } - const context: Context = { - require, - module: {}, - console: customConsole, - - ...dataProxy.getDataProxy(), - ...this.buildRpcCallObject(task.taskId), - }; - - const result = (await runInNewContext( - `module.exports = async function() {${settings.code}\n}()`, - context, - )) as TaskResultData['result']; - - return { - result, - customData: allData.runExecutionData.resultData.metadata, - }; + return JSON.stringify(error); } } diff --git a/packages/@n8n/task-runner/src/execution-error.ts b/packages/@n8n/task-runner/src/execution-error.ts new file mode 100644 index 0000000000..e1fdffc0b6 --- /dev/null +++ b/packages/@n8n/task-runner/src/execution-error.ts @@ -0,0 +1,84 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class ExecutionError extends ApplicationError { + description: string | null = null; + + itemIndex: number | undefined = undefined; + + context: { itemIndex: number } | undefined = undefined; + + stack = ''; + + lineNumber: number | undefined = undefined; + + constructor(error: Error & { stack?: string }, itemIndex?: number) { + super(error.message); + this.itemIndex = itemIndex; + + if (this.itemIndex !== undefined) { + this.context = { itemIndex: this.itemIndex }; + } + + this.stack = error.stack ?? ''; + + this.populateFromStack(); + } + + /** + * Populate error `message` and `description` from error `stack`. + */ + private populateFromStack() { + const stackRows = this.stack.split('\n'); + + if (stackRows.length === 0) { + this.message = 'Unknown error'; + } + + const messageRow = stackRows.find((line) => line.includes('Error:')); + const lineNumberRow = stackRows.find((line) => line.includes('Code:')); + const lineNumberDisplay = this.toLineNumberDisplay(lineNumberRow); + + if (!messageRow) { + this.message = `Unknown error ${lineNumberDisplay}`; + return; + } + + const [errorDetails, errorType] = this.toErrorDetailsAndType(messageRow); + + if (errorType) this.description = errorType; + + if (!errorDetails) { + this.message = `Unknown error ${lineNumberDisplay}`; + return; + } + + this.message = `${errorDetails} ${lineNumberDisplay}`; + } + + private toLineNumberDisplay(lineNumberRow?: string) { + const errorLineNumberMatch = lineNumberRow?.match(/Code:(?\d+)/); + + if (!errorLineNumberMatch?.groups?.lineNumber) return null; + + const lineNumber = errorLineNumberMatch.groups.lineNumber; + + this.lineNumber = Number(lineNumber); + + if (!lineNumber) return ''; + + return this.itemIndex === undefined + ? `[line ${lineNumber}]` + : `[line ${lineNumber}, for item ${this.itemIndex}]`; + } + + private toErrorDetailsAndType(messageRow?: string) { + if (!messageRow) return [null, null]; + + const [errorDetails, errorType] = messageRow + .split(':') + .reverse() + .map((i) => i.trim()); + + return [errorDetails, errorType === 'Error' ? null : errorType]; + } +} diff --git a/packages/@n8n/task-runner/src/obj-utils.ts b/packages/@n8n/task-runner/src/obj-utils.ts new file mode 100644 index 0000000000..1e49e475d2 --- /dev/null +++ b/packages/@n8n/task-runner/src/obj-utils.ts @@ -0,0 +1,5 @@ +export function isObject(maybe: unknown): maybe is { [key: string]: unknown } { + return ( + typeof maybe === 'object' && maybe !== null && !Array.isArray(maybe) && !(maybe instanceof Date) + ); +} diff --git a/packages/@n8n/task-runner/src/result-validation.ts b/packages/@n8n/task-runner/src/result-validation.ts new file mode 100644 index 0000000000..1456805bf4 --- /dev/null +++ b/packages/@n8n/task-runner/src/result-validation.ts @@ -0,0 +1,116 @@ +import { normalizeItems } from 'n8n-core'; +import type { INodeExecutionData } from 'n8n-workflow'; + +import { isObject } from '@/obj-utils'; +import { ValidationError } from '@/validation-error'; + +export const REQUIRED_N8N_ITEM_KEYS = new Set(['json', 'binary', 'pairedItem', 'error']); + +function validateTopLevelKeys(item: INodeExecutionData, itemIndex: number) { + for (const key in item) { + if (Object.prototype.hasOwnProperty.call(item, key)) { + if (REQUIRED_N8N_ITEM_KEYS.has(key)) return; + + throw new ValidationError({ + message: `Unknown top-level item key: ${key}`, + description: 'Access the properties of an item under `.json`, e.g. `item.json`', + itemIndex, + }); + } + } +} + +function validateItem({ json, binary }: INodeExecutionData, itemIndex: number) { + if (json === undefined || !isObject(json)) { + throw new ValidationError({ + message: "A 'json' property isn't an object", + description: "In the returned data, every key named 'json' must point to an object.", + itemIndex, + }); + } + + if (binary !== undefined && !isObject(binary)) { + throw new ValidationError({ + message: "A 'binary' property isn't an object", + description: "In the returned data, every key named 'binary' must point to an object.", + itemIndex, + }); + } +} + +/** + * Validates the output of a code node in 'Run for All Items' mode. + */ +export function validateRunForAllItemsOutput( + executionResult: INodeExecutionData | INodeExecutionData[] | undefined, +) { + if (typeof executionResult !== 'object') { + throw new ValidationError({ + message: "Code doesn't return items properly", + description: 'Please return an array of objects, one for each item you would like to output.', + }); + } + + if (Array.isArray(executionResult)) { + /** + * If at least one top-level key is an n8n item key (`json`, `binary`, etc.), + * then require all item keys to be an n8n item key. + * + * If no top-level key is an n8n key, then skip this check, allowing non-n8n + * item keys to be wrapped in `json` when normalizing items below. + */ + const mustHaveTopLevelN8nKey = executionResult.some((item) => + Object.keys(item).find((key) => REQUIRED_N8N_ITEM_KEYS.has(key)), + ); + + if (mustHaveTopLevelN8nKey) { + for (let index = 0; index < executionResult.length; index++) { + const item = executionResult[index]; + validateTopLevelKeys(item, index); + } + } + } + + const returnData = normalizeItems(executionResult); + returnData.forEach(validateItem); + return returnData; +} + +/** + * Validates the output of a code node in 'Run for Each Item' mode for single item + */ +export function validateRunForEachItemOutput( + executionResult: INodeExecutionData | undefined, + itemIndex: number, +) { + if (typeof executionResult !== 'object') { + throw new ValidationError({ + message: "Code doesn't return an object", + description: `Please return an object representing the output item. ('${executionResult}' was returned instead.)`, + itemIndex, + }); + } + + if (Array.isArray(executionResult)) { + const firstSentence = + executionResult.length > 0 + ? `An array of ${typeof executionResult[0]}s was returned.` + : 'An empty array was returned.'; + throw new ValidationError({ + message: "Code doesn't return a single object", + description: `${firstSentence} If you need to output multiple items, please use the 'Run Once for All Items' mode instead.`, + itemIndex, + }); + } + + const [returnData] = normalizeItems([executionResult]); + + validateItem(returnData, itemIndex); + + // If at least one top-level key is a supported item key (`json`, `binary`, etc.), + // and another top-level key is unrecognized, then the user mis-added a property + // directly on the item, when they intended to add it on the `json` property + validateTopLevelKeys(returnData, itemIndex); + + return returnData; +} diff --git a/packages/@n8n/task-runner/src/task-runner.ts b/packages/@n8n/task-runner/src/task-runner.ts index 6c723ea657..89402d885c 100644 --- a/packages/@n8n/task-runner/src/task-runner.ts +++ b/packages/@n8n/task-runner/src/task-runner.ts @@ -257,11 +257,8 @@ export abstract class TaskRunner { const data = await this.executeTask(task); this.taskDone(taskId, data); } catch (e) { - if (ensureError(e)) { - this.taskErrored(taskId, (e as Error).message); - } else { - this.taskErrored(taskId, e); - } + const error = ensureError(e); + this.taskErrored(taskId, error); } } diff --git a/packages/@n8n/task-runner/src/validation-error.ts b/packages/@n8n/task-runner/src/validation-error.ts new file mode 100644 index 0000000000..f2ba712c2c --- /dev/null +++ b/packages/@n8n/task-runner/src/validation-error.ts @@ -0,0 +1,44 @@ +import { ApplicationError } from 'n8n-workflow'; + +export class ValidationError extends ApplicationError { + description = ''; + + itemIndex: number | undefined = undefined; + + context: { itemIndex: number } | undefined = undefined; + + lineNumber: number | undefined = undefined; + + constructor({ + message, + description, + itemIndex, + lineNumber, + }: { + message: string; + description: string; + itemIndex?: number; + lineNumber?: number; + }) { + super(message); + + this.lineNumber = lineNumber; + this.itemIndex = itemIndex; + + if (this.lineNumber !== undefined && this.itemIndex !== undefined) { + this.message = `${message} [line ${lineNumber}, for item ${itemIndex}]`; + } else if (this.lineNumber !== undefined) { + this.message = `${message} [line ${lineNumber}]`; + } else if (this.itemIndex !== undefined) { + this.message = `${message} [item ${itemIndex}]`; + } else { + this.message = message; + } + + this.description = description; + + if (this.itemIndex !== undefined) { + this.context = { itemIndex: this.itemIndex }; + } + } +} diff --git a/packages/nodes-base/nodes/AiTransform/AiTransform.node.ts b/packages/nodes-base/nodes/AiTransform/AiTransform.node.ts index ac68f13f0e..3741fc980e 100644 --- a/packages/nodes-base/nodes/AiTransform/AiTransform.node.ts +++ b/packages/nodes-base/nodes/AiTransform/AiTransform.node.ts @@ -114,7 +114,7 @@ export class AiTransform implements INodeType { context.items = context.$input.all(); const Sandbox = JavaScriptSandbox; - const sandbox = new Sandbox(context, code, index, this.helpers); + const sandbox = new Sandbox(context, code, this.helpers); sandbox.on( 'output', workflowMode === 'manual' diff --git a/packages/nodes-base/nodes/Code/Code.node.ts b/packages/nodes-base/nodes/Code/Code.node.ts index a7bc4dc653..e1230b0786 100644 --- a/packages/nodes-base/nodes/Code/Code.node.ts +++ b/packages/nodes-base/nodes/Code/Code.node.ts @@ -1,3 +1,5 @@ +import { TaskRunnersConfig } from '@n8n/config'; +import set from 'lodash/set'; import { NodeConnectionType, type CodeExecutionMode, @@ -7,13 +9,12 @@ import { type INodeType, type INodeTypeDescription, } from 'n8n-workflow'; -import set from 'lodash/set'; import Container from 'typedi'; -import { TaskRunnersConfig } from '@n8n/config'; import { javascriptCodeDescription } from './descriptions/JavascriptCodeDescription'; import { pythonCodeDescription } from './descriptions/PythonCodeDescription'; import { JavaScriptSandbox } from './JavaScriptSandbox'; +import { JsTaskRunnerSandbox } from './JsTaskRunnerSandbox'; import { PythonSandbox } from './PythonSandbox'; import { getSandboxContext } from './Sandbox'; import { standardizeOutput } from './utils'; @@ -108,23 +109,17 @@ export class Code implements INodeType { const codeParameterName = language === 'python' ? 'pythonCode' : 'jsCode'; if (!runnersConfig.disabled && language === 'javaScript') { - // TODO: once per item const code = this.getNodeParameter(codeParameterName, 0) as string; - const items = await this.startJob( - { javaScript: 'javascript', python: 'python' }[language] ?? language, - { - code, - nodeMode, - workflowMode, - }, - 0, - ); + const sandbox = new JsTaskRunnerSandbox(code, nodeMode, workflowMode, this); - return [items]; + return nodeMode === 'runOnceForAllItems' + ? [await sandbox.runCodeAllItems()] + : [await sandbox.runCodeForEachItem()]; } const getSandbox = (index = 0) => { const code = this.getNodeParameter(codeParameterName, index) as string; + const context = getSandboxContext.call(this, index); if (nodeMode === 'runOnceForAllItems') { context.items = context.$input.all(); @@ -133,7 +128,7 @@ export class Code implements INodeType { } const Sandbox = language === 'python' ? PythonSandbox : JavaScriptSandbox; - const sandbox = new Sandbox(context, code, index, this.helpers); + const sandbox = new Sandbox(context, code, this.helpers); sandbox.on( 'output', workflowMode === 'manual' @@ -182,7 +177,7 @@ export class Code implements INodeType { const sandbox = getSandbox(index); let result: INodeExecutionData | undefined; try { - result = await sandbox.runCodeEachItem(); + result = await sandbox.runCodeEachItem(index); } catch (error) { if (!this.continueOnFail()) { set(error, 'node', node); diff --git a/packages/nodes-base/nodes/Code/ExecutionError.ts b/packages/nodes-base/nodes/Code/ExecutionError.ts index de7b46ff4d..e1fdffc0b6 100644 --- a/packages/nodes-base/nodes/Code/ExecutionError.ts +++ b/packages/nodes-base/nodes/Code/ExecutionError.ts @@ -11,7 +11,7 @@ export class ExecutionError extends ApplicationError { lineNumber: number | undefined = undefined; - constructor(error: Error & { stack: string }, itemIndex?: number) { + constructor(error: Error & { stack?: string }, itemIndex?: number) { super(error.message); this.itemIndex = itemIndex; @@ -19,7 +19,7 @@ export class ExecutionError extends ApplicationError { this.context = { itemIndex: this.itemIndex }; } - this.stack = error.stack; + this.stack = error.stack ?? ''; this.populateFromStack(); } diff --git a/packages/nodes-base/nodes/Code/JavaScriptSandbox.ts b/packages/nodes-base/nodes/Code/JavaScriptSandbox.ts index 6c9401eb56..73f5ba7f75 100644 --- a/packages/nodes-base/nodes/Code/JavaScriptSandbox.ts +++ b/packages/nodes-base/nodes/Code/JavaScriptSandbox.ts @@ -5,6 +5,11 @@ import { ValidationError } from './ValidationError'; import { ExecutionError } from './ExecutionError'; import type { SandboxContext } from './Sandbox'; import { Sandbox } from './Sandbox'; +import { + mapItemNotDefinedErrorIfNeededForRunForEach, + mapItemsNotDefinedErrorIfNeededForRunForAll, + validateNoDisallowedMethodsInRunForEach, +} from './JsCodeValidator'; const { NODE_FUNCTION_ALLOW_BUILTIN: builtIn, NODE_FUNCTION_ALLOW_EXTERNAL: external } = process.env; @@ -25,7 +30,6 @@ export class JavaScriptSandbox extends Sandbox { constructor( context: SandboxContext, private jsCode: string, - itemIndex: number | undefined, helpers: IExecuteFunctions['helpers'], options?: { resolver?: Resolver }, ) { @@ -36,7 +40,6 @@ export class JavaScriptSandbox extends Sandbox { plural: 'objects', }, }, - itemIndex, helpers, ); this.vm = new NodeVM({ @@ -49,10 +52,10 @@ export class JavaScriptSandbox extends Sandbox { this.vm.on('console.log', (...args: unknown[]) => this.emit('output', ...args)); } - async runCode(): Promise { + async runCode(): Promise { const script = `module.exports = async function() {${this.jsCode}\n}()`; try { - const executionResult = await this.vm.run(script, __dirname); + const executionResult = (await this.vm.run(script, __dirname)) as T; return executionResult; } catch (error) { throw new ExecutionError(error); @@ -70,10 +73,7 @@ export class JavaScriptSandbox extends Sandbox { executionResult = await this.vm.run(script, __dirname); } catch (error) { // anticipate user expecting `items` to pre-exist as in Function Item node - if (error.message === 'items is not defined' && !/(let|const|var) items =/.test(script)) { - const quoted = error.message.replace('items', '`items`'); - error.message = (quoted as string) + '. Did you mean `$input.all()`?'; - } + mapItemsNotDefinedErrorIfNeededForRunForAll(this.jsCode, error); throw new ExecutionError(error); } @@ -87,7 +87,6 @@ export class JavaScriptSandbox extends Sandbox { message: "The code doesn't return an array of arrays", description: 'Please return an array of arrays. One array for the different outputs and one for the different items that get returned.', - itemIndex: this.itemIndex, }); } @@ -101,30 +100,10 @@ export class JavaScriptSandbox extends Sandbox { ); } - async runCodeEachItem(): Promise { + async runCodeEachItem(itemIndex: number): Promise { const script = `module.exports = async function() {${this.jsCode}\n}()`; - const match = this.jsCode.match(/\$input\.(?first|last|all|itemMatching)/); - - if (match?.groups?.disallowedMethod) { - const { disallowedMethod } = match.groups; - - const lineNumber = - this.jsCode.split('\n').findIndex((line) => { - return line.includes(disallowedMethod) && !line.startsWith('//') && !line.startsWith('*'); - }) + 1; - - const disallowedMethodFound = lineNumber !== 0; - - if (disallowedMethodFound) { - throw new ValidationError({ - message: `Can't use .${disallowedMethod}() here`, - description: "This is only available in 'Run Once for All Items' mode", - itemIndex: this.itemIndex, - lineNumber, - }); - } - } + validateNoDisallowedMethodsInRunForEach(this.jsCode, itemIndex); let executionResult: INodeExecutionData; @@ -132,16 +111,13 @@ export class JavaScriptSandbox extends Sandbox { executionResult = await this.vm.run(script, __dirname); } catch (error) { // anticipate user expecting `item` to pre-exist as in Function Item node - if (error.message === 'item is not defined' && !/(let|const|var) item =/.test(script)) { - const quoted = error.message.replace('item', '`item`'); - error.message = (quoted as string) + '. Did you mean `$input.item.json`?'; - } + mapItemNotDefinedErrorIfNeededForRunForEach(this.jsCode, error); - throw new ExecutionError(error, this.itemIndex); + throw new ExecutionError(error, itemIndex); } - if (executionResult === null) return; + if (executionResult === null) return undefined; - return this.validateRunCodeEachItem(executionResult); + return this.validateRunCodeEachItem(executionResult, itemIndex); } } diff --git a/packages/nodes-base/nodes/Code/JsCodeValidator.ts b/packages/nodes-base/nodes/Code/JsCodeValidator.ts new file mode 100644 index 0000000000..fecb6a6854 --- /dev/null +++ b/packages/nodes-base/nodes/Code/JsCodeValidator.ts @@ -0,0 +1,54 @@ +import { ValidationError } from './ValidationError'; + +/** + * Validates that no disallowed methods are used in the + * runCodeForEachItem JS code. Throws `ValidationError` if + * a disallowed method is found. + */ +export function validateNoDisallowedMethodsInRunForEach(code: string, itemIndex: number) { + const match = code.match(/\$input\.(?first|last|all|itemMatching)/); + + if (match?.groups?.disallowedMethod) { + const { disallowedMethod } = match.groups; + + const lineNumber = + code.split('\n').findIndex((line) => { + return line.includes(disallowedMethod) && !line.startsWith('//') && !line.startsWith('*'); + }) + 1; + + const disallowedMethodFound = lineNumber !== 0; + + if (disallowedMethodFound) { + throw new ValidationError({ + message: `Can't use .${disallowedMethod}() here`, + description: "This is only available in 'Run Once for All Items' mode", + itemIndex, + lineNumber, + }); + } + } +} + +/** + * Checks if the error message indicates that `items` is not defined and + * modifies the error message to suggest using `$input.all()`. + */ +export function mapItemsNotDefinedErrorIfNeededForRunForAll(code: string, error: Error) { + // anticipate user expecting `items` to pre-exist as in Function Item node + if (error.message === 'items is not defined' && !/(let|const|var) +items +=/.test(code)) { + const quoted = error.message.replace('items', '`items`'); + error.message = (quoted as string) + '. Did you mean `$input.all()`?'; + } +} + +/** + * Maps the "item is not defined" error message to provide a more helpful suggestion + * for users who may expect `items` to pre-exist + */ +export function mapItemNotDefinedErrorIfNeededForRunForEach(code: string, error: Error) { + // anticipate user expecting `items` to pre-exist as in Function Item node + if (error.message === 'item is not defined' && !/(let|const|var) +item +=/.test(code)) { + const quoted = error.message.replace('item', '`item`'); + error.message = (quoted as string) + '. Did you mean `$input.item.json`?'; + } +} diff --git a/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts b/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts new file mode 100644 index 0000000000..fc0b197f00 --- /dev/null +++ b/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts @@ -0,0 +1,92 @@ +import { + ensureError, + type CodeExecutionMode, + type IExecuteFunctions, + type INodeExecutionData, + type WorkflowExecuteMode, +} from 'n8n-workflow'; + +import { ExecutionError } from './ExecutionError'; +import { + mapItemsNotDefinedErrorIfNeededForRunForAll, + validateNoDisallowedMethodsInRunForEach, +} from './JsCodeValidator'; + +/** + * JS Code execution sandbox that executes the JS code using task runner. + */ +export class JsTaskRunnerSandbox { + constructor( + private readonly jsCode: string, + private readonly nodeMode: CodeExecutionMode, + private readonly workflowMode: WorkflowExecuteMode, + private readonly executeFunctions: IExecuteFunctions, + ) {} + + async runCode(): Promise { + const itemIndex = 0; + + try { + const executionResult = (await this.executeFunctions.startJob( + 'javascript', + { + code: this.jsCode, + nodeMode: this.nodeMode, + workflowMode: this.workflowMode, + }, + itemIndex, + )) as T; + return executionResult; + } catch (e) { + const error = ensureError(e); + throw new ExecutionError(error); + } + } + + async runCodeAllItems(): Promise { + const itemIndex = 0; + + return await this.executeFunctions + .startJob( + 'javascript', + { + code: this.jsCode, + nodeMode: this.nodeMode, + workflowMode: this.workflowMode, + continueOnFail: this.executeFunctions.continueOnFail(), + }, + itemIndex, + ) + .catch((e) => { + const error = ensureError(e); + // anticipate user expecting `items` to pre-exist as in Function Item node + mapItemsNotDefinedErrorIfNeededForRunForAll(this.jsCode, error); + + throw new ExecutionError(error); + }); + } + + async runCodeForEachItem(): Promise { + validateNoDisallowedMethodsInRunForEach(this.jsCode, 0); + const itemIndex = 0; + + return await this.executeFunctions + .startJob( + 'javascript', + { + code: this.jsCode, + nodeMode: this.nodeMode, + workflowMode: this.workflowMode, + continueOnFail: this.executeFunctions.continueOnFail(), + }, + itemIndex, + ) + .catch((e) => { + const error = ensureError(e); + // anticipate user expecting `items` to pre-exist as in Function Item node + mapItemsNotDefinedErrorIfNeededForRunForAll(this.jsCode, error); + + throw new ExecutionError(error); + }); + } +} diff --git a/packages/nodes-base/nodes/Code/PythonSandbox.ts b/packages/nodes-base/nodes/Code/PythonSandbox.ts index fd1ab3f564..13a3bb6228 100644 --- a/packages/nodes-base/nodes/Code/PythonSandbox.ts +++ b/packages/nodes-base/nodes/Code/PythonSandbox.ts @@ -18,7 +18,6 @@ export class PythonSandbox extends Sandbox { constructor( context: SandboxContext, private pythonCode: string, - itemIndex: number | undefined, helpers: IExecuteFunctions['helpers'], ) { super( @@ -28,7 +27,6 @@ export class PythonSandbox extends Sandbox { plural: 'dictionaries', }, }, - itemIndex, helpers, ); // Since python doesn't allow variable names starting with `$`, @@ -39,8 +37,8 @@ export class PythonSandbox extends Sandbox { }, {} as PythonSandboxContext); } - async runCode(): Promise { - return await this.runCodeInPython(); + async runCode(): Promise { + return await this.runCodeInPython(); } async runCodeAllItems() { @@ -48,9 +46,9 @@ export class PythonSandbox extends Sandbox { return this.validateRunCodeAllItems(executionResult); } - async runCodeEachItem() { + async runCodeEachItem(itemIndex: number) { const executionResult = await this.runCodeInPython(); - return this.validateRunCodeEachItem(executionResult); + return this.validateRunCodeEachItem(executionResult, itemIndex); } private async runCodeInPython() { diff --git a/packages/nodes-base/nodes/Code/Sandbox.ts b/packages/nodes-base/nodes/Code/Sandbox.ts index aab88f1b39..932a3f0c39 100644 --- a/packages/nodes-base/nodes/Code/Sandbox.ts +++ b/packages/nodes-base/nodes/Code/Sandbox.ts @@ -1,7 +1,8 @@ import { EventEmitter } from 'events'; import type { IExecuteFunctions, INodeExecutionData, IWorkflowDataProxyData } from 'n8n-workflow'; -import { ValidationError } from './ValidationError'; + import { isObject } from './utils'; +import { ValidationError } from './ValidationError'; interface SandboxTextKeys { object: { @@ -39,26 +40,28 @@ export function getSandboxContext(this: IExecuteFunctions, index: number): Sandb export abstract class Sandbox extends EventEmitter { constructor( private textKeys: SandboxTextKeys, - protected itemIndex: number | undefined, protected helpers: IExecuteFunctions['helpers'], ) { super(); } - abstract runCode(): Promise; + abstract runCode(): Promise; abstract runCodeAllItems(): Promise; - abstract runCodeEachItem(): Promise; + abstract runCodeEachItem(itemIndex: number): Promise; - validateRunCodeEachItem(executionResult: INodeExecutionData | undefined): INodeExecutionData { + validateRunCodeEachItem( + executionResult: INodeExecutionData | undefined, + itemIndex: number, + ): INodeExecutionData { if (typeof executionResult !== 'object') { throw new ValidationError({ message: `Code doesn't return ${this.getTextKey('object', { includeArticle: true })}`, description: `Please return ${this.getTextKey('object', { includeArticle: true, })} representing the output item. ('${executionResult}' was returned instead.)`, - itemIndex: this.itemIndex, + itemIndex, }); } @@ -70,25 +73,24 @@ export abstract class Sandbox extends EventEmitter { throw new ValidationError({ message: `Code doesn't return a single ${this.getTextKey('object')}`, description: `${firstSentence} If you need to output multiple items, please use the 'Run Once for All Items' mode instead.`, - itemIndex: this.itemIndex, + itemIndex, }); } const [returnData] = this.helpers.normalizeItems([executionResult]); - this.validateItem(returnData); + this.validateItem(returnData, itemIndex); // If at least one top-level key is a supported item key (`json`, `binary`, etc.), // and another top-level key is unrecognized, then the user mis-added a property // directly on the item, when they intended to add it on the `json` property - this.validateTopLevelKeys(returnData); + this.validateTopLevelKeys(returnData, itemIndex); return returnData; } validateRunCodeAllItems( executionResult: INodeExecutionData | INodeExecutionData[] | undefined, - itemIndex?: number, ): INodeExecutionData[] { if (typeof executionResult !== 'object') { throw new ValidationError({ @@ -96,7 +98,6 @@ export abstract class Sandbox extends EventEmitter { description: `Please return an array of ${this.getTextKey('object', { plural: true, })}, one for each item you would like to output.`, - itemIndex, }); } @@ -113,14 +114,15 @@ export abstract class Sandbox extends EventEmitter { ); if (mustHaveTopLevelN8nKey) { - for (const item of executionResult) { - this.validateTopLevelKeys(item); + for (let index = 0; index < executionResult.length; index++) { + const item = executionResult[index]; + this.validateTopLevelKeys(item, index); } } } const returnData = this.helpers.normalizeItems(executionResult); - returnData.forEach((item) => this.validateItem(item)); + returnData.forEach((item, index) => this.validateItem(item, index)); return returnData; } @@ -138,7 +140,7 @@ export abstract class Sandbox extends EventEmitter { return `a ${response}`; } - private validateItem({ json, binary }: INodeExecutionData) { + private validateItem({ json, binary }: INodeExecutionData, itemIndex: number) { if (json === undefined || !isObject(json)) { throw new ValidationError({ message: `A 'json' property isn't ${this.getTextKey('object', { includeArticle: true })}`, @@ -146,7 +148,7 @@ export abstract class Sandbox extends EventEmitter { 'object', { includeArticle: true }, )}.`, - itemIndex: this.itemIndex, + itemIndex, }); } @@ -157,18 +159,18 @@ export abstract class Sandbox extends EventEmitter { 'object', { includeArticle: true }, )}.`, - itemIndex: this.itemIndex, + itemIndex, }); } } - private validateTopLevelKeys(item: INodeExecutionData) { + private validateTopLevelKeys(item: INodeExecutionData, itemIndex: number) { Object.keys(item).forEach((key) => { if (REQUIRED_N8N_ITEM_KEYS.has(key)) return; throw new ValidationError({ message: `Unknown top-level item key: ${key}`, description: 'Access the properties of an item under `.json`, e.g. `item.json`', - itemIndex: this.itemIndex, + itemIndex, }); }); }