From e22d0f3877576bface6c4532aff0b937ec04eb9b Mon Sep 17 00:00:00 2001 From: Tomi Turtiainen <10324676+tomi@users.noreply.github.com> Date: Tue, 26 Nov 2024 12:21:51 +0200 Subject: [PATCH] perf(core): Batch items sent in runonceforeachitem mode (no-changelog) (#11870) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Iván Ovejero --- .../@n8n/config/src/configs/runners.config.ts | 4 - packages/@n8n/config/test/config.test.ts | 1 - .../data-request-response-reconstruct.test.ts | 91 +++++++++++++++++++ .../data-request-response-reconstruct.ts | 39 ++++++-- .../__tests__/js-task-runner.test.ts | 29 +++++- .../src/js-task-runner/__tests__/test-data.ts | 1 - .../__tests__/built-ins-parser-state.test.ts | 70 ++++++++++++-- .../built-ins-parser-state.ts | 10 +- .../src/js-task-runner/js-task-runner.ts | 55 ++++++++--- packages/@n8n/task-runner/src/runner-types.ts | 14 ++- .../src/runners/__tests__/task-broker.test.ts | 4 +- .../data-request-response-stripper.test.ts | 53 ++++++++++- .../data-request-response-stripper.ts | 25 ++++- .../src/runners/task-managers/task-manager.ts | 20 +--- packages/nodes-base/nodes/Code/Code.node.ts | 3 +- .../nodes/Code/JsTaskRunnerSandbox.ts | 60 +++++++++--- .../Code/test/JsTaskRunnerSandbox.test.ts | 61 +++++++++++++ 17 files changed, 457 insertions(+), 83 deletions(-) create mode 100644 packages/@n8n/task-runner/src/data-request/__tests__/data-request-response-reconstruct.test.ts create mode 100644 packages/nodes-base/nodes/Code/test/JsTaskRunnerSandbox.test.ts diff --git a/packages/@n8n/config/src/configs/runners.config.ts b/packages/@n8n/config/src/configs/runners.config.ts index 406512f832..d3fca6da08 100644 --- a/packages/@n8n/config/src/configs/runners.config.ts +++ b/packages/@n8n/config/src/configs/runners.config.ts @@ -43,10 +43,6 @@ export class TaskRunnersConfig { @Env('N8N_RUNNERS_MAX_CONCURRENCY') maxConcurrency: number = 5; - /** Should the output of deduplication be asserted for correctness */ - @Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT') - assertDeduplicationOutput: boolean = false; - /** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */ @Env('N8N_RUNNERS_TASK_TIMEOUT') taskTimeout: number = 60; diff --git a/packages/@n8n/config/test/config.test.ts b/packages/@n8n/config/test/config.test.ts index c5dc2a35b5..58b808ee4b 100644 --- a/packages/@n8n/config/test/config.test.ts +++ b/packages/@n8n/config/test/config.test.ts @@ -231,7 +231,6 @@ describe('GlobalConfig', () => { port: 5679, maxOldSpaceSize: '', maxConcurrency: 5, - assertDeduplicationOutput: false, taskTimeout: 60, heartbeatInterval: 30, }, diff --git a/packages/@n8n/task-runner/src/data-request/__tests__/data-request-response-reconstruct.test.ts b/packages/@n8n/task-runner/src/data-request/__tests__/data-request-response-reconstruct.test.ts new file mode 100644 index 0000000000..40768bb5cc --- /dev/null +++ b/packages/@n8n/task-runner/src/data-request/__tests__/data-request-response-reconstruct.test.ts @@ -0,0 +1,91 @@ +import { mock } from 'jest-mock-extended'; +import type { + IExecuteData, + INode, + INodeExecutionData, + ITaskDataConnectionsSource, +} from 'n8n-workflow'; + +import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types'; + +import { DataRequestResponseReconstruct } from '../data-request-response-reconstruct'; + +describe('DataRequestResponseReconstruct', () => { + const reconstruct = new DataRequestResponseReconstruct(); + + describe('reconstructConnectionInputItems', () => { + it('should return all input items if no chunk is provided', () => { + const inputData: DataRequestResponse['inputData'] = { + main: [[{ json: { key: 'value' } }]], + }; + + const result = reconstruct.reconstructConnectionInputItems(inputData); + + expect(result).toEqual([{ json: { key: 'value' } }]); + }); + + it('should reconstruct sparse array when chunk is provided', () => { + const inputData: DataRequestResponse['inputData'] = { + main: [[{ json: { key: 'chunked' } }]], + }; + const chunk: InputDataChunkDefinition = { startIndex: 2, count: 1 }; + + const result = reconstruct.reconstructConnectionInputItems(inputData, chunk); + + expect(result).toEqual([undefined, undefined, { json: { key: 'chunked' } }, undefined]); + }); + + it('should handle empty input data gracefully', () => { + const inputData: DataRequestResponse['inputData'] = { main: [[]] }; + const chunk: InputDataChunkDefinition = { startIndex: 1, count: 1 }; + + const result = reconstruct.reconstructConnectionInputItems(inputData, chunk); + + expect(result).toEqual([undefined]); + }); + }); + + describe('reconstructExecuteData', () => { + it('should reconstruct execute data with the provided input items', () => { + const node = mock(); + const connectionInputSource = mock(); + const response = mock({ + inputData: { main: [[]] }, + node, + connectionInputSource, + }); + const inputItems: INodeExecutionData[] = [{ json: { key: 'reconstructed' } }]; + + const result = reconstruct.reconstructExecuteData(response, inputItems); + + expect(result).toEqual({ + data: { + main: [inputItems], + }, + node: response.node, + source: response.connectionInputSource, + }); + }); + + it('should handle empty input items gracefully', () => { + const node = mock(); + const connectionInputSource = mock(); + const inputItems: INodeExecutionData[] = []; + const response = mock({ + inputData: { main: [[{ json: { key: 'value' } }]] }, + node, + connectionInputSource, + }); + + const result = reconstruct.reconstructExecuteData(response, inputItems); + + expect(result).toEqual({ + data: { + main: [inputItems], + }, + node: response.node, + source: response.connectionInputSource, + }); + }); + }); +}); diff --git a/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts b/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts index 83a291a491..29f50c3610 100644 --- a/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts +++ b/packages/@n8n/task-runner/src/data-request/data-request-response-reconstruct.ts @@ -1,6 +1,6 @@ -import type { IExecuteData, INodeExecutionData } from 'n8n-workflow'; +import type { IExecuteData, INodeExecutionData, ITaskDataConnections } from 'n8n-workflow'; -import type { DataRequestResponse } from '@/runner-types'; +import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types'; /** * Reconstructs data from a DataRequestResponse to the initial @@ -8,20 +8,43 @@ import type { DataRequestResponse } from '@/runner-types'; */ export class DataRequestResponseReconstruct { /** - * Reconstructs `connectionInputData` from a DataRequestResponse + * Reconstructs `inputData` from a DataRequestResponse */ - reconstructConnectionInputData( + reconstructConnectionInputItems( inputData: DataRequestResponse['inputData'], - ): INodeExecutionData[] { - return inputData?.main?.[0] ?? []; + chunk?: InputDataChunkDefinition, + ): Array { + const inputItems = inputData?.main?.[0] ?? []; + if (!chunk) { + return inputItems; + } + + // Only a chunk of the input items was requested. We reconstruct + // the array by filling in the missing items with `undefined`. + let sparseInputItems: Array = []; + + sparseInputItems = sparseInputItems + .concat(Array.from({ length: chunk.startIndex })) + .concat(inputItems) + .concat(Array.from({ length: inputItems.length - chunk.startIndex - chunk.count })); + + return sparseInputItems; } /** * Reconstruct `executeData` from a DataRequestResponse */ - reconstructExecuteData(response: DataRequestResponse): IExecuteData { + reconstructExecuteData( + response: DataRequestResponse, + inputItems: INodeExecutionData[], + ): IExecuteData { + const inputData: ITaskDataConnections = { + ...response.inputData, + main: [inputItems], + }; + return { - data: response.inputData, + data: inputData, node: response.node, source: response.connectionInputSource, }; diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts index 736bcf16e7..05e0b91f77 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/js-task-runner.test.ts @@ -10,7 +10,7 @@ import { ExecutionError } from '@/js-task-runner/errors/execution-error'; import { ValidationError } from '@/js-task-runner/errors/validation-error'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; import { JsTaskRunner } from '@/js-task-runner/js-task-runner'; -import type { DataRequestResponse } from '@/runner-types'; +import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types'; import type { Task } from '@/task-runner'; import { @@ -95,17 +95,19 @@ describe('JsTaskRunner', () => { inputItems, settings, runner, + chunk, }: { code: string; inputItems: IDataObject[]; settings?: Partial; - runner?: JsTaskRunner; + chunk?: InputDataChunkDefinition; }) => { return await execTaskWithParams({ task: newTaskWithSettings({ code, nodeMode: 'runOnceForEachItem', + chunk, ...settings, }), taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)), @@ -509,6 +511,28 @@ describe('JsTaskRunner', () => { ); }); + describe('chunked execution', () => { + it('should use correct index for each item', async () => { + const outcome = await executeForEachItem({ + code: 'return { ...$json, idx: $itemIndex }', + inputItems: [{ a: 1 }, { b: 2 }, { c: 3 }], + chunk: { + startIndex: 100, + count: 3, + }, + }); + + expect(outcome).toEqual({ + result: [ + withPairedItem(100, wrapIntoJson({ a: 1, idx: 100 })), + withPairedItem(101, wrapIntoJson({ b: 2, idx: 101 })), + withPairedItem(102, wrapIntoJson({ c: 3, idx: 102 })), + ], + customData: undefined, + }); + }); + }); + it('should return static items', async () => { const outcome = await executeForEachItem({ code: 'return {json: {b: 1}}', @@ -801,7 +825,6 @@ describe('JsTaskRunner', () => { code: 'unknown; return []', nodeMode: 'runOnceForAllItems', continueOnFail: false, - mode: 'manual', workflowMode: 'manual', }); runner.runningTasks.set(taskId, task); diff --git a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts index 224f630807..ef910e838f 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/__tests__/test-data.ts @@ -16,7 +16,6 @@ export const newTaskWithSettings = ( settings: { workflowMode: 'manual', continueOnFail: false, - mode: 'manual', ...settings, }, active: true, diff --git a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser-state.test.ts b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser-state.test.ts index 0b75ae563e..eb38eb31c4 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser-state.test.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/__tests__/built-ins-parser-state.test.ts @@ -1,14 +1,17 @@ import { BuiltInsParserState } from '../built-ins-parser-state'; describe('BuiltInsParserState', () => { - describe('toDataRequestSpecification', () => { + describe('toDataRequestParams', () => { it('should return empty array when no properties are marked as needed', () => { const state = new BuiltInsParserState(); expect(state.toDataRequestParams()).toEqual({ dataOfNodes: [], env: false, - input: false, + input: { + chunk: undefined, + include: false, + }, prevNode: false, }); }); @@ -20,7 +23,10 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: 'all', env: false, - input: true, + input: { + chunk: undefined, + include: true, + }, prevNode: false, }); }); @@ -33,7 +39,10 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: ['Node1', 'Node2'], env: false, - input: false, + input: { + chunk: undefined, + include: false, + }, prevNode: false, }); }); @@ -47,7 +56,10 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: 'all', env: false, - input: true, + input: { + chunk: undefined, + include: true, + }, prevNode: false, }); }); @@ -59,7 +71,10 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: [], env: true, - input: false, + input: { + chunk: undefined, + include: false, + }, prevNode: false, }); }); @@ -71,7 +86,33 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: [], env: false, - input: true, + input: { + chunk: undefined, + include: true, + }, + prevNode: false, + }); + }); + + it('should use the given chunk', () => { + const state = new BuiltInsParserState(); + state.markInputAsNeeded(); + + expect( + state.toDataRequestParams({ + count: 10, + startIndex: 5, + }), + ).toEqual({ + dataOfNodes: [], + env: false, + input: { + chunk: { + count: 10, + startIndex: 5, + }, + include: true, + }, prevNode: false, }); }); @@ -83,7 +124,10 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: [], env: false, - input: false, + input: { + chunk: undefined, + include: false, + }, prevNode: true, }); }); @@ -98,7 +142,10 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: 'all', env: true, - input: true, + input: { + chunk: undefined, + include: true, + }, prevNode: true, }); }); @@ -109,7 +156,10 @@ describe('BuiltInsParserState', () => { expect(state.toDataRequestParams()).toEqual({ dataOfNodes: 'all', env: true, - input: true, + input: { + chunk: undefined, + include: true, + }, prevNode: true, }); }); diff --git a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser-state.ts b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser-state.ts index 064a45df67..777738e123 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser-state.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/built-ins-parser/built-ins-parser-state.ts @@ -1,4 +1,5 @@ import type { BrokerMessage } from '@/message-types'; +import type { InputDataChunkDefinition } from '@/runner-types'; /** * Class to keep track of which built-in variables are accessed in the code @@ -53,11 +54,16 @@ export class BuiltInsParserState { this.needs$prevNode = true; } - toDataRequestParams(): BrokerMessage.ToRequester.TaskDataRequest['requestParams'] { + toDataRequestParams( + chunk?: InputDataChunkDefinition, + ): BrokerMessage.ToRequester.TaskDataRequest['requestParams'] { return { dataOfNodes: this.needsAllNodes ? 'all' : Array.from(this.neededNodeNames), env: this.needs$env, - input: this.needs$input, + input: { + include: this.needs$input, + chunk, + }, prevNode: this.needs$prevNode, }; } diff --git a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts index c64d58636b..005267862e 100644 --- a/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts +++ b/packages/@n8n/task-runner/src/js-task-runner/js-task-runner.ts @@ -19,7 +19,12 @@ import * as a from 'node:assert'; import { runInNewContext, type Context } from 'node:vm'; import type { MainConfig } from '@/config/main-config'; -import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types'; +import type { + DataRequestResponse, + InputDataChunkDefinition, + PartialAdditionalData, + TaskResultData, +} from '@/runner-types'; import { type Task, TaskRunner } from '@/task-runner'; import { BuiltInsParser } from './built-ins-parser/built-ins-parser'; @@ -37,9 +42,8 @@ export interface JSExecSettings { nodeMode: CodeExecutionMode; workflowMode: WorkflowExecuteMode; continueOnFail: boolean; - - // For workflow data proxy - mode: WorkflowExecuteMode; + // For executing partial input data + chunk?: InputDataChunkDefinition; } export interface JsTaskData { @@ -94,6 +98,8 @@ export class JsTaskRunner extends TaskRunner { const settings = task.settings; a.ok(settings, 'JS Code not sent to runner'); + this.validateTaskSettings(settings); + const neededBuiltInsResult = this.builtInsParser.parseUsedBuiltIns(settings.code); const neededBuiltIns = neededBuiltInsResult.ok ? neededBuiltInsResult.result @@ -101,10 +107,10 @@ export class JsTaskRunner extends TaskRunner { const dataResponse = await this.requestData( task.taskId, - neededBuiltIns.toDataRequestParams(), + neededBuiltIns.toDataRequestParams(settings.chunk), ); - const data = this.reconstructTaskData(dataResponse); + const data = this.reconstructTaskData(dataResponse, settings.chunk); await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId); @@ -136,6 +142,14 @@ export class JsTaskRunner extends TaskRunner { }; } + private validateTaskSettings(settings: JSExecSettings) { + a.ok(settings.code, 'No code to execute'); + + if (settings.nodeMode === 'runOnceForAllItems') { + a.ok(settings.chunk === undefined, 'Chunking is not supported for runOnceForAllItems'); + } + } + private getNativeVariables() { return { // Exposed Node.js globals in vm2 @@ -220,7 +234,13 @@ export class JsTaskRunner extends TaskRunner { const inputItems = data.connectionInputData; const returnData: INodeExecutionData[] = []; - for (let index = 0; index < inputItems.length; index++) { + // If a chunk was requested, only process the items in the chunk + const chunkStartIdx = settings.chunk ? settings.chunk.startIndex : 0; + const chunkEndIdx = settings.chunk + ? settings.chunk.startIndex + settings.chunk.count + : inputItems.length; + + for (let index = chunkStartIdx; index < chunkEndIdx; index++) { const item = inputItems[index]; const dataProxy = this.createDataProxy(data, workflow, index); const context: Context = { @@ -325,13 +345,24 @@ export class JsTaskRunner extends TaskRunner { return new ExecutionError({ message: JSON.stringify(error) }); } - private reconstructTaskData(response: DataRequestResponse): JsTaskData { + private reconstructTaskData( + response: DataRequestResponse, + chunk?: InputDataChunkDefinition, + ): JsTaskData { + const inputData = this.taskDataReconstruct.reconstructConnectionInputItems( + response.inputData, + chunk, + // This type assertion is intentional. Chunking is only supported in + // runOnceForEachItem mode and if a chunk was requested, we intentionally + // fill the array with undefined values for the items outside the chunk. + // We only iterate over the chunk items but WorkflowDataProxy expects + // the full array of items. + ) as INodeExecutionData[]; + return { ...response, - connectionInputData: this.taskDataReconstruct.reconstructConnectionInputData( - response.inputData, - ), - executeData: this.taskDataReconstruct.reconstructExecuteData(response), + connectionInputData: inputData, + executeData: this.taskDataReconstruct.reconstructExecuteData(response, inputData), }; } diff --git a/packages/@n8n/task-runner/src/runner-types.ts b/packages/@n8n/task-runner/src/runner-types.ts index 4649c2cc2f..174d652e7f 100644 --- a/packages/@n8n/task-runner/src/runner-types.ts +++ b/packages/@n8n/task-runner/src/runner-types.ts @@ -15,6 +15,18 @@ import type { WorkflowParameters, } from 'n8n-workflow'; +export interface InputDataChunkDefinition { + startIndex: number; + count: number; +} + +export interface InputDataRequestParams { + /** Whether to include the input data in the response */ + include: boolean; + /** Optionally request only a specific chunk of data instead of all input data */ + chunk?: InputDataChunkDefinition; +} + /** * Specifies what data should be included for a task data request. */ @@ -22,7 +34,7 @@ export interface TaskDataRequestParams { dataOfNodes: string[] | 'all'; prevNode: boolean; /** Whether input data for the node should be included */ - input: boolean; + input: InputDataRequestParams; /** Whether env provider's state should be included */ env: boolean; } diff --git a/packages/cli/src/runners/__tests__/task-broker.test.ts b/packages/cli/src/runners/__tests__/task-broker.test.ts index 2f9a6a26e7..8e86f189e8 100644 --- a/packages/cli/src/runners/__tests__/task-broker.test.ts +++ b/packages/cli/src/runners/__tests__/task-broker.test.ts @@ -524,7 +524,9 @@ describe('TaskBroker', () => { const requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'] = { dataOfNodes: 'all', env: true, - input: true, + input: { + include: true, + }, prevNode: true, }; diff --git a/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts b/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts index d100fc1430..4b27542c4d 100644 --- a/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts +++ b/packages/cli/src/runners/task-managers/__tests__/data-request-response-stripper.test.ts @@ -18,6 +18,7 @@ const workflow: DataRequestResponse['workflow'] = mock { const allDataParam: TaskDataRequestParams = { dataOfNodes: 'all', env: true, - input: true, + input: { + include: true, + }, prevNode: true, }; @@ -177,7 +205,9 @@ describe('DataRequestResponseStripper', () => { describe('input data', () => { const allExceptInputParam = newRequestParam({ - input: false, + input: { + include: false, + }, }); it('drops input data from result', () => { @@ -186,10 +216,23 @@ describe('DataRequestResponseStripper', () => { expect(result.inputData).toStrictEqual({}); }); - it('drops input data from result', () => { - const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip(); + it('returns only chunked data', () => { + const result = new DataRequestResponseStripper( + taskData, + newRequestParam({ + input: { + include: true, + chunk: { + startIndex: 1, + count: 1, + }, + }, + }), + ).strip(); - expect(result.inputData).toStrictEqual({}); + expect(result.inputData).toStrictEqual({ + main: [debugHelperNodeOutItems.slice(1, 2)], + }); }); }); diff --git a/packages/cli/src/runners/task-managers/data-request-response-stripper.ts b/packages/cli/src/runners/task-managers/data-request-response-stripper.ts index 721f51d3c1..8acfd26c7b 100644 --- a/packages/cli/src/runners/task-managers/data-request-response-stripper.ts +++ b/packages/cli/src/runners/task-managers/data-request-response-stripper.ts @@ -6,6 +6,7 @@ import type { IRunExecutionData, ITaskDataConnections, } from 'n8n-workflow'; +import * as a from 'node:assert/strict'; /** * Strips data from data request response based on the specified parameters @@ -81,11 +82,31 @@ export class DataRequestResponseStripper { } private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections { - if (this.stripParams.input) { + if (!this.stripParams.input.include) { + return {}; + } + + return this.stripParams.input.chunk ? this.stripChunkedInputData(inputData) : inputData; + } + + private stripChunkedInputData(inputData: ITaskDataConnections): ITaskDataConnections { + const { chunk } = this.stripParams.input; + a.ok(chunk); + + const inputItems = inputData.main?.[0]; + if (!inputItems) { return inputData; } - return {}; + // If a chunk of the input data is requested, we only return that chunk + // It is the responsibility of the requester to rebuild the input data + const chunkInputItems = inputItems.slice(chunk.startIndex, chunk.startIndex + chunk.count); + const chunkedInputData: ITaskDataConnections = { + ...inputData, + main: [chunkInputItems], + }; + + return chunkedInputData; } /** diff --git a/packages/cli/src/runners/task-managers/task-manager.ts b/packages/cli/src/runners/task-managers/task-manager.ts index ffc86cdf62..66f07f7b0a 100644 --- a/packages/cli/src/runners/task-managers/task-manager.ts +++ b/packages/cli/src/runners/task-managers/task-manager.ts @@ -1,6 +1,5 @@ -import { TaskRunnersConfig } from '@n8n/config'; import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner'; -import { DataRequestResponseReconstruct, RPC_ALLOW_LIST } from '@n8n/task-runner'; +import { RPC_ALLOW_LIST } from '@n8n/task-runner'; import type { EnvProviderState, IExecuteFunctions, @@ -18,8 +17,7 @@ import type { } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow'; import { nanoid } from 'nanoid'; -import * as a from 'node:assert/strict'; -import Container, { Service } from 'typedi'; +import { Service } from 'typedi'; import { NodeTypes } from '@/node-types'; @@ -59,8 +57,6 @@ export abstract class TaskManager { tasks: Map = new Map(); - private readonly runnerConfig = Container.get(TaskRunnersConfig); - private readonly dataResponseBuilder = new DataRequestResponseBuilder(); constructor(private readonly nodeTypes: NodeTypes) {} @@ -246,18 +242,6 @@ export abstract class TaskManager { const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data); - if (this.runnerConfig.assertDeduplicationOutput) { - const reconstruct = new DataRequestResponseReconstruct(); - a.deepStrictEqual( - reconstruct.reconstructConnectionInputData(dataRequestResponse.inputData), - job.data.connectionInputData, - ); - a.deepStrictEqual( - reconstruct.reconstructExecuteData(dataRequestResponse), - job.data.executeData, - ); - } - const strippedData = new DataRequestResponseStripper( dataRequestResponse, requestParams, diff --git a/packages/nodes-base/nodes/Code/Code.node.ts b/packages/nodes-base/nodes/Code/Code.node.ts index 5f9b537d96..65028f037e 100644 --- a/packages/nodes-base/nodes/Code/Code.node.ts +++ b/packages/nodes-base/nodes/Code/Code.node.ts @@ -111,10 +111,11 @@ export class Code implements INodeType { if (runnersConfig.enabled && language === 'javaScript') { const code = this.getNodeParameter(codeParameterName, 0) as string; const sandbox = new JsTaskRunnerSandbox(code, nodeMode, workflowMode, this); + const numInputItems = this.getInputData().length; return nodeMode === 'runOnceForAllItems' ? [await sandbox.runCodeAllItems()] - : [await sandbox.runCodeForEachItem()]; + : [await sandbox.runCodeForEachItem(numInputItems)]; } const getSandbox = (index = 0) => { diff --git a/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts b/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts index 98d98a28e3..8a506ce40c 100644 --- a/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts +++ b/packages/nodes-base/nodes/Code/JsTaskRunnerSandbox.ts @@ -18,6 +18,7 @@ export class JsTaskRunnerSandbox { private readonly nodeMode: CodeExecutionMode, private readonly workflowMode: WorkflowExecuteMode, private readonly executeFunctions: IExecuteFunctions, + private readonly chunkSize = 1000, ) {} async runCodeAllItems(): Promise { @@ -39,24 +40,37 @@ export class JsTaskRunnerSandbox { : this.throwExecutionError(executionResult.error); } - async runCodeForEachItem(): Promise { + async runCodeForEachItem(numInputItems: number): Promise { validateNoDisallowedMethodsInRunForEach(this.jsCode, 0); + const itemIndex = 0; + const chunks = this.chunkInputItems(numInputItems); + let executionResults: INodeExecutionData[] = []; - const executionResult = await this.executeFunctions.startJob( - 'javascript', - { - code: this.jsCode, - nodeMode: this.nodeMode, - workflowMode: this.workflowMode, - continueOnFail: this.executeFunctions.continueOnFail(), - }, - itemIndex, - ); + for (const chunk of chunks) { + const executionResult = await this.executeFunctions.startJob( + 'javascript', + { + code: this.jsCode, + nodeMode: this.nodeMode, + workflowMode: this.workflowMode, + continueOnFail: this.executeFunctions.continueOnFail(), + chunk: { + startIndex: chunk.startIdx, + count: chunk.count, + }, + }, + itemIndex, + ); - return executionResult.ok - ? executionResult.result - : this.throwExecutionError(executionResult.error); + if (!executionResult.ok) { + return this.throwExecutionError(executionResult.error); + } + + executionResults = executionResults.concat(executionResult.result); + } + + return executionResults; } private throwExecutionError(error: unknown): never { @@ -70,4 +84,22 @@ export class JsTaskRunnerSandbox { throw new ApplicationError(`Unknown error: ${JSON.stringify(error)}`); } + + /** Chunks the input items into chunks of 1000 items each */ + private chunkInputItems(numInputItems: number) { + const numChunks = Math.ceil(numInputItems / this.chunkSize); + const chunks = []; + + for (let i = 0; i < numChunks; i++) { + const startIdx = i * this.chunkSize; + const isLastChunk = i === numChunks - 1; + const count = isLastChunk ? numInputItems - startIdx : this.chunkSize; + chunks.push({ + startIdx, + count, + }); + } + + return chunks; + } } diff --git a/packages/nodes-base/nodes/Code/test/JsTaskRunnerSandbox.test.ts b/packages/nodes-base/nodes/Code/test/JsTaskRunnerSandbox.test.ts new file mode 100644 index 0000000000..00eb17544a --- /dev/null +++ b/packages/nodes-base/nodes/Code/test/JsTaskRunnerSandbox.test.ts @@ -0,0 +1,61 @@ +import { mock } from 'jest-mock-extended'; +import type { IExecuteFunctions } from 'n8n-workflow'; +import { createResultOk } from 'n8n-workflow'; + +import { JsTaskRunnerSandbox } from '../JsTaskRunnerSandbox'; + +describe('JsTaskRunnerSandbox', () => { + describe('runCodeForEachItem', () => { + it('should chunk the input items and execute the code for each chunk', async () => { + const jsCode = 'console.log($item);'; + const nodeMode = 'runOnceForEachItem'; + const workflowMode = 'manual'; + const executeFunctions = mock(); + const sandbox = new JsTaskRunnerSandbox(jsCode, nodeMode, workflowMode, executeFunctions, 2); + let i = 1; + executeFunctions.startJob.mockResolvedValue(createResultOk([{ json: { item: i++ } }])); + + const numInputItems = 5; + await sandbox.runCodeForEachItem(numInputItems); + + // eslint-disable-next-line @typescript-eslint/unbound-method + expect(executeFunctions.startJob).toHaveBeenCalledTimes(3); + const calls = executeFunctions.startJob.mock.calls; + expect(calls).toEqual([ + [ + 'javascript', + { + code: jsCode, + nodeMode, + workflowMode, + continueOnFail: executeFunctions.continueOnFail(), + chunk: { startIndex: 0, count: 2 }, + }, + 0, + ], + [ + 'javascript', + { + code: jsCode, + nodeMode, + workflowMode, + continueOnFail: executeFunctions.continueOnFail(), + chunk: { startIndex: 2, count: 2 }, + }, + 0, + ], + [ + 'javascript', + { + code: jsCode, + nodeMode, + workflowMode, + continueOnFail: executeFunctions.continueOnFail(), + chunk: { startIndex: 4, count: 1 }, + }, + 0, + ], + ]); + }); + }); +});