perf(core): Batch items sent in runonceforeachitem mode (no-changelog) (#11870)

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
This commit is contained in:
Tomi Turtiainen 2024-11-26 12:21:51 +02:00 committed by GitHub
parent 1adb730599
commit e22d0f3877
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 457 additions and 83 deletions

View file

@ -43,10 +43,6 @@ export class TaskRunnersConfig {
@Env('N8N_RUNNERS_MAX_CONCURRENCY') @Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5; maxConcurrency: number = 5;
/** Should the output of deduplication be asserted for correctness */
@Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT')
assertDeduplicationOutput: boolean = false;
/** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */ /** How long (in seconds) a task is allowed to take for completion, else the task will be aborted and the runner restarted. Must be greater than 0. */
@Env('N8N_RUNNERS_TASK_TIMEOUT') @Env('N8N_RUNNERS_TASK_TIMEOUT')
taskTimeout: number = 60; taskTimeout: number = 60;

View file

@ -231,7 +231,6 @@ describe('GlobalConfig', () => {
port: 5679, port: 5679,
maxOldSpaceSize: '', maxOldSpaceSize: '',
maxConcurrency: 5, maxConcurrency: 5,
assertDeduplicationOutput: false,
taskTimeout: 60, taskTimeout: 60,
heartbeatInterval: 30, heartbeatInterval: 30,
}, },

View file

@ -0,0 +1,91 @@
import { mock } from 'jest-mock-extended';
import type {
IExecuteData,
INode,
INodeExecutionData,
ITaskDataConnectionsSource,
} from 'n8n-workflow';
import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types';
import { DataRequestResponseReconstruct } from '../data-request-response-reconstruct';
describe('DataRequestResponseReconstruct', () => {
const reconstruct = new DataRequestResponseReconstruct();
describe('reconstructConnectionInputItems', () => {
it('should return all input items if no chunk is provided', () => {
const inputData: DataRequestResponse['inputData'] = {
main: [[{ json: { key: 'value' } }]],
};
const result = reconstruct.reconstructConnectionInputItems(inputData);
expect(result).toEqual([{ json: { key: 'value' } }]);
});
it('should reconstruct sparse array when chunk is provided', () => {
const inputData: DataRequestResponse['inputData'] = {
main: [[{ json: { key: 'chunked' } }]],
};
const chunk: InputDataChunkDefinition = { startIndex: 2, count: 1 };
const result = reconstruct.reconstructConnectionInputItems(inputData, chunk);
expect(result).toEqual([undefined, undefined, { json: { key: 'chunked' } }, undefined]);
});
it('should handle empty input data gracefully', () => {
const inputData: DataRequestResponse['inputData'] = { main: [[]] };
const chunk: InputDataChunkDefinition = { startIndex: 1, count: 1 };
const result = reconstruct.reconstructConnectionInputItems(inputData, chunk);
expect(result).toEqual([undefined]);
});
});
describe('reconstructExecuteData', () => {
it('should reconstruct execute data with the provided input items', () => {
const node = mock<INode>();
const connectionInputSource = mock<ITaskDataConnectionsSource>();
const response = mock<DataRequestResponse>({
inputData: { main: [[]] },
node,
connectionInputSource,
});
const inputItems: INodeExecutionData[] = [{ json: { key: 'reconstructed' } }];
const result = reconstruct.reconstructExecuteData(response, inputItems);
expect(result).toEqual<IExecuteData>({
data: {
main: [inputItems],
},
node: response.node,
source: response.connectionInputSource,
});
});
it('should handle empty input items gracefully', () => {
const node = mock<INode>();
const connectionInputSource = mock<ITaskDataConnectionsSource>();
const inputItems: INodeExecutionData[] = [];
const response = mock<DataRequestResponse>({
inputData: { main: [[{ json: { key: 'value' } }]] },
node,
connectionInputSource,
});
const result = reconstruct.reconstructExecuteData(response, inputItems);
expect(result).toEqual<IExecuteData>({
data: {
main: [inputItems],
},
node: response.node,
source: response.connectionInputSource,
});
});
});
});

View file

@ -1,6 +1,6 @@
import type { IExecuteData, INodeExecutionData } from 'n8n-workflow'; import type { IExecuteData, INodeExecutionData, ITaskDataConnections } from 'n8n-workflow';
import type { DataRequestResponse } from '@/runner-types'; import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types';
/** /**
* Reconstructs data from a DataRequestResponse to the initial * Reconstructs data from a DataRequestResponse to the initial
@ -8,20 +8,43 @@ import type { DataRequestResponse } from '@/runner-types';
*/ */
export class DataRequestResponseReconstruct { export class DataRequestResponseReconstruct {
/** /**
* Reconstructs `connectionInputData` from a DataRequestResponse * Reconstructs `inputData` from a DataRequestResponse
*/ */
reconstructConnectionInputData( reconstructConnectionInputItems(
inputData: DataRequestResponse['inputData'], inputData: DataRequestResponse['inputData'],
): INodeExecutionData[] { chunk?: InputDataChunkDefinition,
return inputData?.main?.[0] ?? []; ): Array<INodeExecutionData | undefined> {
const inputItems = inputData?.main?.[0] ?? [];
if (!chunk) {
return inputItems;
}
// Only a chunk of the input items was requested. We reconstruct
// the array by filling in the missing items with `undefined`.
let sparseInputItems: Array<INodeExecutionData | undefined> = [];
sparseInputItems = sparseInputItems
.concat(Array.from({ length: chunk.startIndex }))
.concat(inputItems)
.concat(Array.from({ length: inputItems.length - chunk.startIndex - chunk.count }));
return sparseInputItems;
} }
/** /**
* Reconstruct `executeData` from a DataRequestResponse * Reconstruct `executeData` from a DataRequestResponse
*/ */
reconstructExecuteData(response: DataRequestResponse): IExecuteData { reconstructExecuteData(
response: DataRequestResponse,
inputItems: INodeExecutionData[],
): IExecuteData {
const inputData: ITaskDataConnections = {
...response.inputData,
main: [inputItems],
};
return { return {
data: response.inputData, data: inputData,
node: response.node, node: response.node,
source: response.connectionInputSource, source: response.connectionInputSource,
}; };

View file

@ -10,7 +10,7 @@ import { ExecutionError } from '@/js-task-runner/errors/execution-error';
import { ValidationError } from '@/js-task-runner/errors/validation-error'; import { ValidationError } from '@/js-task-runner/errors/validation-error';
import type { JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import { JsTaskRunner } from '@/js-task-runner/js-task-runner'; import { JsTaskRunner } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types'; import type { DataRequestResponse, InputDataChunkDefinition } from '@/runner-types';
import type { Task } from '@/task-runner'; import type { Task } from '@/task-runner';
import { import {
@ -95,17 +95,19 @@ describe('JsTaskRunner', () => {
inputItems, inputItems,
settings, settings,
runner, runner,
chunk,
}: { }: {
code: string; code: string;
inputItems: IDataObject[]; inputItems: IDataObject[];
settings?: Partial<JSExecSettings>; settings?: Partial<JSExecSettings>;
runner?: JsTaskRunner; runner?: JsTaskRunner;
chunk?: InputDataChunkDefinition;
}) => { }) => {
return await execTaskWithParams({ return await execTaskWithParams({
task: newTaskWithSettings({ task: newTaskWithSettings({
code, code,
nodeMode: 'runOnceForEachItem', nodeMode: 'runOnceForEachItem',
chunk,
...settings, ...settings,
}), }),
taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)), taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)),
@ -509,6 +511,28 @@ describe('JsTaskRunner', () => {
); );
}); });
describe('chunked execution', () => {
it('should use correct index for each item', async () => {
const outcome = await executeForEachItem({
code: 'return { ...$json, idx: $itemIndex }',
inputItems: [{ a: 1 }, { b: 2 }, { c: 3 }],
chunk: {
startIndex: 100,
count: 3,
},
});
expect(outcome).toEqual({
result: [
withPairedItem(100, wrapIntoJson({ a: 1, idx: 100 })),
withPairedItem(101, wrapIntoJson({ b: 2, idx: 101 })),
withPairedItem(102, wrapIntoJson({ c: 3, idx: 102 })),
],
customData: undefined,
});
});
});
it('should return static items', async () => { it('should return static items', async () => {
const outcome = await executeForEachItem({ const outcome = await executeForEachItem({
code: 'return {json: {b: 1}}', code: 'return {json: {b: 1}}',
@ -801,7 +825,6 @@ describe('JsTaskRunner', () => {
code: 'unknown; return []', code: 'unknown; return []',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
continueOnFail: false, continueOnFail: false,
mode: 'manual',
workflowMode: 'manual', workflowMode: 'manual',
}); });
runner.runningTasks.set(taskId, task); runner.runningTasks.set(taskId, task);

View file

@ -16,7 +16,6 @@ export const newTaskWithSettings = (
settings: { settings: {
workflowMode: 'manual', workflowMode: 'manual',
continueOnFail: false, continueOnFail: false,
mode: 'manual',
...settings, ...settings,
}, },
active: true, active: true,

View file

@ -1,14 +1,17 @@
import { BuiltInsParserState } from '../built-ins-parser-state'; import { BuiltInsParserState } from '../built-ins-parser-state';
describe('BuiltInsParserState', () => { describe('BuiltInsParserState', () => {
describe('toDataRequestSpecification', () => { describe('toDataRequestParams', () => {
it('should return empty array when no properties are marked as needed', () => { it('should return empty array when no properties are marked as needed', () => {
const state = new BuiltInsParserState(); const state = new BuiltInsParserState();
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [], dataOfNodes: [],
env: false, env: false,
input: false, input: {
chunk: undefined,
include: false,
},
prevNode: false, prevNode: false,
}); });
}); });
@ -20,7 +23,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all', dataOfNodes: 'all',
env: false, env: false,
input: true, input: {
chunk: undefined,
include: true,
},
prevNode: false, prevNode: false,
}); });
}); });
@ -33,7 +39,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: ['Node1', 'Node2'], dataOfNodes: ['Node1', 'Node2'],
env: false, env: false,
input: false, input: {
chunk: undefined,
include: false,
},
prevNode: false, prevNode: false,
}); });
}); });
@ -47,7 +56,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all', dataOfNodes: 'all',
env: false, env: false,
input: true, input: {
chunk: undefined,
include: true,
},
prevNode: false, prevNode: false,
}); });
}); });
@ -59,7 +71,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [], dataOfNodes: [],
env: true, env: true,
input: false, input: {
chunk: undefined,
include: false,
},
prevNode: false, prevNode: false,
}); });
}); });
@ -71,7 +86,33 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [], dataOfNodes: [],
env: false, env: false,
input: true, input: {
chunk: undefined,
include: true,
},
prevNode: false,
});
});
it('should use the given chunk', () => {
const state = new BuiltInsParserState();
state.markInputAsNeeded();
expect(
state.toDataRequestParams({
count: 10,
startIndex: 5,
}),
).toEqual({
dataOfNodes: [],
env: false,
input: {
chunk: {
count: 10,
startIndex: 5,
},
include: true,
},
prevNode: false, prevNode: false,
}); });
}); });
@ -83,7 +124,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [], dataOfNodes: [],
env: false, env: false,
input: false, input: {
chunk: undefined,
include: false,
},
prevNode: true, prevNode: true,
}); });
}); });
@ -98,7 +142,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all', dataOfNodes: 'all',
env: true, env: true,
input: true, input: {
chunk: undefined,
include: true,
},
prevNode: true, prevNode: true,
}); });
}); });
@ -109,7 +156,10 @@ describe('BuiltInsParserState', () => {
expect(state.toDataRequestParams()).toEqual({ expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all', dataOfNodes: 'all',
env: true, env: true,
input: true, input: {
chunk: undefined,
include: true,
},
prevNode: true, prevNode: true,
}); });
}); });

View file

@ -1,4 +1,5 @@
import type { BrokerMessage } from '@/message-types'; import type { BrokerMessage } from '@/message-types';
import type { InputDataChunkDefinition } from '@/runner-types';
/** /**
* Class to keep track of which built-in variables are accessed in the code * Class to keep track of which built-in variables are accessed in the code
@ -53,11 +54,16 @@ export class BuiltInsParserState {
this.needs$prevNode = true; this.needs$prevNode = true;
} }
toDataRequestParams(): BrokerMessage.ToRequester.TaskDataRequest['requestParams'] { toDataRequestParams(
chunk?: InputDataChunkDefinition,
): BrokerMessage.ToRequester.TaskDataRequest['requestParams'] {
return { return {
dataOfNodes: this.needsAllNodes ? 'all' : Array.from(this.neededNodeNames), dataOfNodes: this.needsAllNodes ? 'all' : Array.from(this.neededNodeNames),
env: this.needs$env, env: this.needs$env,
input: this.needs$input, input: {
include: this.needs$input,
chunk,
},
prevNode: this.needs$prevNode, prevNode: this.needs$prevNode,
}; };
} }

View file

@ -19,7 +19,12 @@ import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm'; import { runInNewContext, type Context } from 'node:vm';
import type { MainConfig } from '@/config/main-config'; import type { MainConfig } from '@/config/main-config';
import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types'; import type {
DataRequestResponse,
InputDataChunkDefinition,
PartialAdditionalData,
TaskResultData,
} from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner'; import { type Task, TaskRunner } from '@/task-runner';
import { BuiltInsParser } from './built-ins-parser/built-ins-parser'; import { BuiltInsParser } from './built-ins-parser/built-ins-parser';
@ -37,9 +42,8 @@ export interface JSExecSettings {
nodeMode: CodeExecutionMode; nodeMode: CodeExecutionMode;
workflowMode: WorkflowExecuteMode; workflowMode: WorkflowExecuteMode;
continueOnFail: boolean; continueOnFail: boolean;
// For executing partial input data
// For workflow data proxy chunk?: InputDataChunkDefinition;
mode: WorkflowExecuteMode;
} }
export interface JsTaskData { export interface JsTaskData {
@ -94,6 +98,8 @@ export class JsTaskRunner extends TaskRunner {
const settings = task.settings; const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner'); a.ok(settings, 'JS Code not sent to runner');
this.validateTaskSettings(settings);
const neededBuiltInsResult = this.builtInsParser.parseUsedBuiltIns(settings.code); const neededBuiltInsResult = this.builtInsParser.parseUsedBuiltIns(settings.code);
const neededBuiltIns = neededBuiltInsResult.ok const neededBuiltIns = neededBuiltInsResult.ok
? neededBuiltInsResult.result ? neededBuiltInsResult.result
@ -101,10 +107,10 @@ export class JsTaskRunner extends TaskRunner {
const dataResponse = await this.requestData<DataRequestResponse>( const dataResponse = await this.requestData<DataRequestResponse>(
task.taskId, task.taskId,
neededBuiltIns.toDataRequestParams(), neededBuiltIns.toDataRequestParams(settings.chunk),
); );
const data = this.reconstructTaskData(dataResponse); const data = this.reconstructTaskData(dataResponse, settings.chunk);
await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId); await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId);
@ -136,6 +142,14 @@ export class JsTaskRunner extends TaskRunner {
}; };
} }
private validateTaskSettings(settings: JSExecSettings) {
a.ok(settings.code, 'No code to execute');
if (settings.nodeMode === 'runOnceForAllItems') {
a.ok(settings.chunk === undefined, 'Chunking is not supported for runOnceForAllItems');
}
}
private getNativeVariables() { private getNativeVariables() {
return { return {
// Exposed Node.js globals in vm2 // Exposed Node.js globals in vm2
@ -220,7 +234,13 @@ export class JsTaskRunner extends TaskRunner {
const inputItems = data.connectionInputData; const inputItems = data.connectionInputData;
const returnData: INodeExecutionData[] = []; const returnData: INodeExecutionData[] = [];
for (let index = 0; index < inputItems.length; index++) { // If a chunk was requested, only process the items in the chunk
const chunkStartIdx = settings.chunk ? settings.chunk.startIndex : 0;
const chunkEndIdx = settings.chunk
? settings.chunk.startIndex + settings.chunk.count
: inputItems.length;
for (let index = chunkStartIdx; index < chunkEndIdx; index++) {
const item = inputItems[index]; const item = inputItems[index];
const dataProxy = this.createDataProxy(data, workflow, index); const dataProxy = this.createDataProxy(data, workflow, index);
const context: Context = { const context: Context = {
@ -325,13 +345,24 @@ export class JsTaskRunner extends TaskRunner {
return new ExecutionError({ message: JSON.stringify(error) }); return new ExecutionError({ message: JSON.stringify(error) });
} }
private reconstructTaskData(response: DataRequestResponse): JsTaskData { private reconstructTaskData(
response: DataRequestResponse,
chunk?: InputDataChunkDefinition,
): JsTaskData {
const inputData = this.taskDataReconstruct.reconstructConnectionInputItems(
response.inputData,
chunk,
// This type assertion is intentional. Chunking is only supported in
// runOnceForEachItem mode and if a chunk was requested, we intentionally
// fill the array with undefined values for the items outside the chunk.
// We only iterate over the chunk items but WorkflowDataProxy expects
// the full array of items.
) as INodeExecutionData[];
return { return {
...response, ...response,
connectionInputData: this.taskDataReconstruct.reconstructConnectionInputData( connectionInputData: inputData,
response.inputData, executeData: this.taskDataReconstruct.reconstructExecuteData(response, inputData),
),
executeData: this.taskDataReconstruct.reconstructExecuteData(response),
}; };
} }

View file

@ -15,6 +15,18 @@ import type {
WorkflowParameters, WorkflowParameters,
} from 'n8n-workflow'; } from 'n8n-workflow';
export interface InputDataChunkDefinition {
startIndex: number;
count: number;
}
export interface InputDataRequestParams {
/** Whether to include the input data in the response */
include: boolean;
/** Optionally request only a specific chunk of data instead of all input data */
chunk?: InputDataChunkDefinition;
}
/** /**
* Specifies what data should be included for a task data request. * Specifies what data should be included for a task data request.
*/ */
@ -22,7 +34,7 @@ export interface TaskDataRequestParams {
dataOfNodes: string[] | 'all'; dataOfNodes: string[] | 'all';
prevNode: boolean; prevNode: boolean;
/** Whether input data for the node should be included */ /** Whether input data for the node should be included */
input: boolean; input: InputDataRequestParams;
/** Whether env provider's state should be included */ /** Whether env provider's state should be included */
env: boolean; env: boolean;
} }

View file

@ -524,7 +524,9 @@ describe('TaskBroker', () => {
const requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'] = { const requestParams: RunnerMessage.ToBroker.TaskDataRequest['requestParams'] = {
dataOfNodes: 'all', dataOfNodes: 'all',
env: true, env: true,
input: true, input: {
include: true,
},
prevNode: true, prevNode: true,
}; };

View file

@ -18,6 +18,7 @@ const workflow: DataRequestResponse['workflow'] = mock<DataRequestResponse['work
const debugHelperNodeOutItems: INodeExecutionData[] = [ const debugHelperNodeOutItems: INodeExecutionData[] = [
{ {
json: { json: {
idx: 0,
uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032', uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032',
email: 'Dan.Schmidt31@yahoo.com', email: 'Dan.Schmidt31@yahoo.com',
firstname: 'Toni', firstname: 'Toni',
@ -28,6 +29,31 @@ const debugHelperNodeOutItems: INodeExecutionData[] = [
item: 0, item: 0,
}, },
}, },
{
json: {
idx: 1,
uid: '4620e4-c1b4-dd8b-9a45-d0f9a29a3b7f',
email: 'bob.johnson@domain.com',
firstName: 'Bob',
lastName: 'Johnson',
password: '6e41b5ecf',
},
pairedItem: {
item: 1,
},
},
{
json: {
idx: 2,
email: '4358d3-418b-a8b3-49cb-076b1180f402',
firstName: 'Eve',
lastName: 'Johnson',
password: 'e2414620e',
},
pairedItem: {
item: 2,
},
},
]; ];
const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems; const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems;
const envProviderState: DataRequestResponse['envProviderState'] = mock< const envProviderState: DataRequestResponse['envProviderState'] = mock<
@ -137,7 +163,9 @@ describe('DataRequestResponseStripper', () => {
const allDataParam: TaskDataRequestParams = { const allDataParam: TaskDataRequestParams = {
dataOfNodes: 'all', dataOfNodes: 'all',
env: true, env: true,
input: true, input: {
include: true,
},
prevNode: true, prevNode: true,
}; };
@ -177,7 +205,9 @@ describe('DataRequestResponseStripper', () => {
describe('input data', () => { describe('input data', () => {
const allExceptInputParam = newRequestParam({ const allExceptInputParam = newRequestParam({
input: false, input: {
include: false,
},
}); });
it('drops input data from result', () => { it('drops input data from result', () => {
@ -186,10 +216,23 @@ describe('DataRequestResponseStripper', () => {
expect(result.inputData).toStrictEqual({}); expect(result.inputData).toStrictEqual({});
}); });
it('drops input data from result', () => { it('returns only chunked data', () => {
const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip(); const result = new DataRequestResponseStripper(
taskData,
newRequestParam({
input: {
include: true,
chunk: {
startIndex: 1,
count: 1,
},
},
}),
).strip();
expect(result.inputData).toStrictEqual({}); expect(result.inputData).toStrictEqual({
main: [debugHelperNodeOutItems.slice(1, 2)],
});
}); });
}); });

View file

@ -6,6 +6,7 @@ import type {
IRunExecutionData, IRunExecutionData,
ITaskDataConnections, ITaskDataConnections,
} from 'n8n-workflow'; } from 'n8n-workflow';
import * as a from 'node:assert/strict';
/** /**
* Strips data from data request response based on the specified parameters * Strips data from data request response based on the specified parameters
@ -81,11 +82,31 @@ export class DataRequestResponseStripper {
} }
private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections { private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections {
if (this.stripParams.input) { if (!this.stripParams.input.include) {
return {};
}
return this.stripParams.input.chunk ? this.stripChunkedInputData(inputData) : inputData;
}
private stripChunkedInputData(inputData: ITaskDataConnections): ITaskDataConnections {
const { chunk } = this.stripParams.input;
a.ok(chunk);
const inputItems = inputData.main?.[0];
if (!inputItems) {
return inputData; return inputData;
} }
return {}; // If a chunk of the input data is requested, we only return that chunk
// It is the responsibility of the requester to rebuild the input data
const chunkInputItems = inputItems.slice(chunk.startIndex, chunk.startIndex + chunk.count);
const chunkedInputData: ITaskDataConnections = {
...inputData,
main: [chunkInputItems],
};
return chunkedInputData;
} }
/** /**

View file

@ -1,6 +1,5 @@
import { TaskRunnersConfig } from '@n8n/config';
import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner'; import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner';
import { DataRequestResponseReconstruct, RPC_ALLOW_LIST } from '@n8n/task-runner'; import { RPC_ALLOW_LIST } from '@n8n/task-runner';
import type { import type {
EnvProviderState, EnvProviderState,
IExecuteFunctions, IExecuteFunctions,
@ -18,8 +17,7 @@ import type {
} from 'n8n-workflow'; } from 'n8n-workflow';
import { createResultOk, createResultError } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import * as a from 'node:assert/strict'; import { Service } from 'typedi';
import Container, { Service } from 'typedi';
import { NodeTypes } from '@/node-types'; import { NodeTypes } from '@/node-types';
@ -59,8 +57,6 @@ export abstract class TaskManager {
tasks: Map<string, Task> = new Map(); tasks: Map<string, Task> = new Map();
private readonly runnerConfig = Container.get(TaskRunnersConfig);
private readonly dataResponseBuilder = new DataRequestResponseBuilder(); private readonly dataResponseBuilder = new DataRequestResponseBuilder();
constructor(private readonly nodeTypes: NodeTypes) {} constructor(private readonly nodeTypes: NodeTypes) {}
@ -246,18 +242,6 @@ export abstract class TaskManager {
const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data); const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data);
if (this.runnerConfig.assertDeduplicationOutput) {
const reconstruct = new DataRequestResponseReconstruct();
a.deepStrictEqual(
reconstruct.reconstructConnectionInputData(dataRequestResponse.inputData),
job.data.connectionInputData,
);
a.deepStrictEqual(
reconstruct.reconstructExecuteData(dataRequestResponse),
job.data.executeData,
);
}
const strippedData = new DataRequestResponseStripper( const strippedData = new DataRequestResponseStripper(
dataRequestResponse, dataRequestResponse,
requestParams, requestParams,

View file

@ -111,10 +111,11 @@ export class Code implements INodeType {
if (runnersConfig.enabled && language === 'javaScript') { if (runnersConfig.enabled && language === 'javaScript') {
const code = this.getNodeParameter(codeParameterName, 0) as string; const code = this.getNodeParameter(codeParameterName, 0) as string;
const sandbox = new JsTaskRunnerSandbox(code, nodeMode, workflowMode, this); const sandbox = new JsTaskRunnerSandbox(code, nodeMode, workflowMode, this);
const numInputItems = this.getInputData().length;
return nodeMode === 'runOnceForAllItems' return nodeMode === 'runOnceForAllItems'
? [await sandbox.runCodeAllItems()] ? [await sandbox.runCodeAllItems()]
: [await sandbox.runCodeForEachItem()]; : [await sandbox.runCodeForEachItem(numInputItems)];
} }
const getSandbox = (index = 0) => { const getSandbox = (index = 0) => {

View file

@ -18,6 +18,7 @@ export class JsTaskRunnerSandbox {
private readonly nodeMode: CodeExecutionMode, private readonly nodeMode: CodeExecutionMode,
private readonly workflowMode: WorkflowExecuteMode, private readonly workflowMode: WorkflowExecuteMode,
private readonly executeFunctions: IExecuteFunctions, private readonly executeFunctions: IExecuteFunctions,
private readonly chunkSize = 1000,
) {} ) {}
async runCodeAllItems(): Promise<INodeExecutionData[]> { async runCodeAllItems(): Promise<INodeExecutionData[]> {
@ -39,24 +40,37 @@ export class JsTaskRunnerSandbox {
: this.throwExecutionError(executionResult.error); : this.throwExecutionError(executionResult.error);
} }
async runCodeForEachItem(): Promise<INodeExecutionData[]> { async runCodeForEachItem(numInputItems: number): Promise<INodeExecutionData[]> {
validateNoDisallowedMethodsInRunForEach(this.jsCode, 0); validateNoDisallowedMethodsInRunForEach(this.jsCode, 0);
const itemIndex = 0; const itemIndex = 0;
const chunks = this.chunkInputItems(numInputItems);
let executionResults: INodeExecutionData[] = [];
const executionResult = await this.executeFunctions.startJob<INodeExecutionData[]>( for (const chunk of chunks) {
'javascript', const executionResult = await this.executeFunctions.startJob<INodeExecutionData[]>(
{ 'javascript',
code: this.jsCode, {
nodeMode: this.nodeMode, code: this.jsCode,
workflowMode: this.workflowMode, nodeMode: this.nodeMode,
continueOnFail: this.executeFunctions.continueOnFail(), workflowMode: this.workflowMode,
}, continueOnFail: this.executeFunctions.continueOnFail(),
itemIndex, chunk: {
); startIndex: chunk.startIdx,
count: chunk.count,
},
},
itemIndex,
);
return executionResult.ok if (!executionResult.ok) {
? executionResult.result return this.throwExecutionError(executionResult.error);
: this.throwExecutionError(executionResult.error); }
executionResults = executionResults.concat(executionResult.result);
}
return executionResults;
} }
private throwExecutionError(error: unknown): never { private throwExecutionError(error: unknown): never {
@ -70,4 +84,22 @@ export class JsTaskRunnerSandbox {
throw new ApplicationError(`Unknown error: ${JSON.stringify(error)}`); throw new ApplicationError(`Unknown error: ${JSON.stringify(error)}`);
} }
/** Chunks the input items into chunks of 1000 items each */
private chunkInputItems(numInputItems: number) {
const numChunks = Math.ceil(numInputItems / this.chunkSize);
const chunks = [];
for (let i = 0; i < numChunks; i++) {
const startIdx = i * this.chunkSize;
const isLastChunk = i === numChunks - 1;
const count = isLastChunk ? numInputItems - startIdx : this.chunkSize;
chunks.push({
startIdx,
count,
});
}
return chunks;
}
} }

View file

@ -0,0 +1,61 @@
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions } from 'n8n-workflow';
import { createResultOk } from 'n8n-workflow';
import { JsTaskRunnerSandbox } from '../JsTaskRunnerSandbox';
describe('JsTaskRunnerSandbox', () => {
describe('runCodeForEachItem', () => {
it('should chunk the input items and execute the code for each chunk', async () => {
const jsCode = 'console.log($item);';
const nodeMode = 'runOnceForEachItem';
const workflowMode = 'manual';
const executeFunctions = mock<IExecuteFunctions>();
const sandbox = new JsTaskRunnerSandbox(jsCode, nodeMode, workflowMode, executeFunctions, 2);
let i = 1;
executeFunctions.startJob.mockResolvedValue(createResultOk([{ json: { item: i++ } }]));
const numInputItems = 5;
await sandbox.runCodeForEachItem(numInputItems);
// eslint-disable-next-line @typescript-eslint/unbound-method
expect(executeFunctions.startJob).toHaveBeenCalledTimes(3);
const calls = executeFunctions.startJob.mock.calls;
expect(calls).toEqual([
[
'javascript',
{
code: jsCode,
nodeMode,
workflowMode,
continueOnFail: executeFunctions.continueOnFail(),
chunk: { startIndex: 0, count: 2 },
},
0,
],
[
'javascript',
{
code: jsCode,
nodeMode,
workflowMode,
continueOnFail: executeFunctions.continueOnFail(),
chunk: { startIndex: 2, count: 2 },
},
0,
],
[
'javascript',
{
code: jsCode,
nodeMode,
workflowMode,
continueOnFail: executeFunctions.continueOnFail(),
chunk: { startIndex: 4, count: 1 },
},
0,
],
]);
});
});
});