perf(core): Deduplicate task runner data request response (no-changelog) (#11583)

Co-authored-by: Iván Ovejero <ivov.src@gmail.com>
This commit is contained in:
Tomi Turtiainen 2024-11-07 11:24:00 +02:00 committed by GitHub
parent 8cba100488
commit 3111dece72
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 672 additions and 572 deletions

View file

@ -50,4 +50,8 @@ export class TaskRunnersConfig {
/** How many concurrent tasks can a runner execute at a time */ /** How many concurrent tasks can a runner execute at a time */
@Env('N8N_RUNNERS_MAX_CONCURRENCY') @Env('N8N_RUNNERS_MAX_CONCURRENCY')
maxConcurrency: number = 5; maxConcurrency: number = 5;
/** Should the output of deduplication be asserted for correctness */
@Env('N8N_RUNNERS_ASSERT_DEDUPLICATION_OUTPUT')
assertDeduplicationOutput: boolean = false;
} }

View file

@ -233,6 +233,7 @@ describe('GlobalConfig', () => {
launcherRunner: 'javascript', launcherRunner: 'javascript',
maxOldSpaceSize: '', maxOldSpaceSize: '',
maxConcurrency: 5, maxConcurrency: 5,
assertDeduplicationOutput: false,
}, },
sentry: { sentry: {
backendDsn: '', backendDsn: '',

View file

@ -0,0 +1,29 @@
import type { IExecuteData, INodeExecutionData } from 'n8n-workflow';
import type { DataRequestResponse } from '@/runner-types';
/**
* Reconstructs data from a DataRequestResponse to the initial
* data structures.
*/
export class DataRequestResponseReconstruct {
/**
* Reconstructs `connectionInputData` from a DataRequestResponse
*/
reconstructConnectionInputData(
inputData: DataRequestResponse['inputData'],
): INodeExecutionData[] {
return inputData?.main?.[0] ?? [];
}
/**
* Reconstruct `executeData` from a DataRequestResponse
*/
reconstructExecuteData(response: DataRequestResponse): IExecuteData {
return {
data: response.inputData,
node: response.node,
source: response.connectionInputSource,
};
}
}

View file

@ -1,3 +1,4 @@
export * from './task-runner'; export * from './task-runner';
export * from './runner-types'; export * from './runner-types';
export * from './message-types'; export * from './message-types';
export * from './data-request/data-request-response-reconstruct';

View file

@ -3,15 +3,21 @@ import type { CodeExecutionMode, IDataObject } from 'n8n-workflow';
import fs from 'node:fs'; import fs from 'node:fs';
import { builtinModules } from 'node:module'; import { builtinModules } from 'node:module';
import type { JsRunnerConfig } from '@/config/js-runner-config';
import { MainConfig } from '@/config/main-config';
import { ExecutionError } from '@/js-task-runner/errors/execution-error';
import { ValidationError } from '@/js-task-runner/errors/validation-error'; import { ValidationError } from '@/js-task-runner/errors/validation-error';
import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import { JsTaskRunner } from '@/js-task-runner/js-task-runner'; import { JsTaskRunner } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types';
import type { Task } from '@/task-runner'; import type { Task } from '@/task-runner';
import { newCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data'; import {
import type { JsRunnerConfig } from '../../config/js-runner-config'; newDataRequestResponse,
import { MainConfig } from '../../config/main-config'; newTaskWithSettings,
import { ExecutionError } from '../errors/execution-error'; withPairedItem,
wrapIntoJson,
} from './test-data';
jest.mock('ws'); jest.mock('ws');
@ -68,7 +74,7 @@ describe('JsTaskRunner', () => {
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
...settings, ...settings,
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)),
runner, runner,
}); });
}; };
@ -91,7 +97,7 @@ describe('JsTaskRunner', () => {
nodeMode: 'runOnceForEachItem', nodeMode: 'runOnceForEachItem',
...settings, ...settings,
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson)), taskData: newDataRequestResponse(inputItems.map(wrapIntoJson)),
runner, runner,
}); });
}; };
@ -108,7 +114,7 @@ describe('JsTaskRunner', () => {
await execTaskWithParams({ await execTaskWithParams({
task, task,
taskData: newCodeTaskData([wrapIntoJson({})]), taskData: newDataRequestResponse([wrapIntoJson({})]),
}); });
expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [ expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
@ -243,7 +249,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.VAR1 }', code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: { envProviderState: {
isEnvAccessBlocked: false, isEnvAccessBlocked: false,
isProcessAvailable: true, isProcessAvailable: true,
@ -262,7 +268,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.VAR1 }', code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: { envProviderState: {
isEnvAccessBlocked: true, isEnvAccessBlocked: true,
isProcessAvailable: true, isProcessAvailable: true,
@ -279,7 +285,7 @@ describe('JsTaskRunner', () => {
code: 'return Object.values($env).concat(Object.keys($env))', code: 'return Object.values($env).concat(Object.keys($env))',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: { envProviderState: {
isEnvAccessBlocked: false, isEnvAccessBlocked: false,
isProcessAvailable: true, isProcessAvailable: true,
@ -298,7 +304,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.N8N_RUNNERS_N8N_URI }', code: 'return { val: $env.N8N_RUNNERS_N8N_URI }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: undefined, envProviderState: undefined,
}), }),
}); });
@ -313,7 +319,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: Buffer.from("test-buffer").toString() }', code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForAllItems', nodeMode: 'runOnceForAllItems',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: undefined, envProviderState: undefined,
}), }),
}); });
@ -325,7 +331,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: Buffer.from("test-buffer").toString() }', code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForEachItem', nodeMode: 'runOnceForEachItem',
}), }),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), { taskData: newDataRequestResponse(inputItems.map(wrapIntoJson), {
envProviderState: undefined, envProviderState: undefined,
}), }),
}); });
@ -771,7 +777,7 @@ describe('JsTaskRunner', () => {
code: 'unknown', code: 'unknown',
nodeMode, nodeMode,
}), }),
taskData: newCodeTaskData([wrapIntoJson({ a: 1 })]), taskData: newDataRequestResponse([wrapIntoJson({ a: 1 })]),
}), }),
).rejects.toThrow(ExecutionError); ).rejects.toThrow(ExecutionError);
}, },
@ -793,7 +799,7 @@ describe('JsTaskRunner', () => {
jest.spyOn(runner, 'sendOffers').mockImplementation(() => {}); jest.spyOn(runner, 'sendOffers').mockImplementation(() => {});
jest jest
.spyOn(runner, 'requestData') .spyOn(runner, 'requestData')
.mockResolvedValue(newCodeTaskData([wrapIntoJson({ a: 1 })])); .mockResolvedValue(newDataRequestResponse([wrapIntoJson({ a: 1 })]));
await runner.receivedSettings(taskId, task.settings); await runner.receivedSettings(taskId, task.settings);

View file

@ -2,7 +2,8 @@ import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-work
import { NodeConnectionType } from 'n8n-workflow'; import { NodeConnectionType } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner'; import type { JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse } from '@/runner-types';
import type { Task } from '@/task-runner'; import type { Task } from '@/task-runner';
/** /**
@ -46,10 +47,10 @@ export const newTaskData = (opts: Partial<ITaskData> & Pick<ITaskData, 'source'>
}); });
/** /**
* Creates a new all code task data with the given options * Creates a new data request response with the given options
*/ */
export const newCodeTaskData = ( export const newDataRequestResponse = (
codeNodeInputData: INodeExecutionData[], inputData: INodeExecutionData[],
opts: Partial<DataRequestResponse> = {}, opts: Partial<DataRequestResponse> = {},
): DataRequestResponse => { ): DataRequestResponse => {
const codeNode = newNode({ const codeNode = newNode({
@ -83,9 +84,8 @@ export const newCodeTaskData = (
nodes: [manualTriggerNode, codeNode], nodes: [manualTriggerNode, codeNode],
}, },
inputData: { inputData: {
main: [codeNodeInputData], main: [inputData],
}, },
connectionInputData: codeNodeInputData,
node: codeNode, node: codeNode,
runExecutionData: { runExecutionData: {
startData: {}, startData: {},
@ -95,7 +95,7 @@ export const newCodeTaskData = (
newTaskData({ newTaskData({
source: [], source: [],
data: { data: {
main: [codeNodeInputData], main: [inputData],
}, },
}), }),
], ],
@ -137,14 +137,13 @@ export const newCodeTaskData = (
var: 'value', var: 'value',
}, },
}, },
executeData: { connectionInputSource: {
node: codeNode, main: [
data: { {
main: [codeNodeInputData], previousNode: 'Trigger',
}, previousNodeOutput: 0,
source: { },
main: [{ previousNode: manualTriggerNode.name }], ],
},
}, },
...opts, ...opts,
}; };

View file

@ -1,8 +1,13 @@
import { getAdditionalKeys } from 'n8n-core'; import { getAdditionalKeys } from 'n8n-core';
import type { IDataObject, INodeType, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; import type {
IDataObject,
IExecuteData,
INodeType,
IWorkflowExecuteAdditionalData,
} from 'n8n-workflow';
import { Workflow, WorkflowDataProxy } from 'n8n-workflow'; import { Workflow, WorkflowDataProxy } from 'n8n-workflow';
import { newCodeTaskData } from '../../__tests__/test-data'; import { newDataRequestResponse } from '../../__tests__/test-data';
import { BuiltInsParser } from '../built-ins-parser'; import { BuiltInsParser } from '../built-ins-parser';
import { BuiltInsParserState } from '../built-ins-parser-state'; import { BuiltInsParserState } from '../built-ins-parser-state';
@ -159,7 +164,12 @@ describe('BuiltInsParser', () => {
describe('WorkflowDataProxy built-ins', () => { describe('WorkflowDataProxy built-ins', () => {
it('should have a known list of built-ins', () => { it('should have a known list of built-ins', () => {
const data = newCodeTaskData([]); const data = newDataRequestResponse([]);
const executeData: IExecuteData = {
data: {},
node: data.node,
source: data.connectionInputSource,
};
const dataProxy = new WorkflowDataProxy( const dataProxy = new WorkflowDataProxy(
new Workflow({ new Workflow({
...data.workflow, ...data.workflow,
@ -179,7 +189,7 @@ describe('BuiltInsParser', () => {
data.runIndex, data.runIndex,
0, 0,
data.activeNodeName, data.activeNodeName,
data.connectionInputData, [],
data.siblingParameters, data.siblingParameters,
data.mode, data.mode,
getAdditionalKeys( getAdditionalKeys(
@ -187,7 +197,7 @@ describe('BuiltInsParser', () => {
data.mode, data.mode,
data.runExecutionData, data.runExecutionData,
), ),
data.executeData, executeData,
data.defaultReturnRunIndex, data.defaultReturnRunIndex,
data.selfData, data.selfData,
data.contextNodeName, data.contextNodeName,

View file

@ -1,28 +1,25 @@
import { getAdditionalKeys } from 'n8n-core'; import { getAdditionalKeys } from 'n8n-core';
import { import { WorkflowDataProxy, Workflow } from 'n8n-workflow';
WorkflowDataProxy,
// type IWorkflowDataProxyAdditionalKeys,
Workflow,
} from 'n8n-workflow';
import type { import type {
CodeExecutionMode, CodeExecutionMode,
INode,
ITaskDataConnections,
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
WorkflowParameters,
IDataObject, IDataObject,
IExecuteData,
INodeExecutionData, INodeExecutionData,
INodeParameters, INodeParameters,
IRunExecutionData,
WorkflowExecuteMode, WorkflowExecuteMode,
WorkflowParameters,
ITaskDataConnections,
INode,
IRunExecutionData,
EnvProviderState, EnvProviderState,
IExecuteData,
INodeTypeDescription, INodeTypeDescription,
} from 'n8n-workflow'; } from 'n8n-workflow';
import * as a from 'node:assert'; import * as a from 'node:assert';
import { runInNewContext, type Context } from 'node:vm'; import { runInNewContext, type Context } from 'node:vm';
import type { TaskResultData } from '@/runner-types'; import type { MainConfig } from '@/config/main-config';
import type { DataRequestResponse, PartialAdditionalData, TaskResultData } from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner'; import { type Task, TaskRunner } from '@/task-runner';
import { BuiltInsParser } from './built-ins-parser/built-ins-parser'; import { BuiltInsParser } from './built-ins-parser/built-ins-parser';
@ -33,7 +30,7 @@ import { makeSerializable } from './errors/serializable-error';
import type { RequireResolver } from './require-resolver'; import type { RequireResolver } from './require-resolver';
import { createRequireResolver } from './require-resolver'; import { createRequireResolver } from './require-resolver';
import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation'; import { validateRunForAllItemsOutput, validateRunForEachItemOutput } from './result-validation';
import type { MainConfig } from '../config/main-config'; import { DataRequestResponseReconstruct } from '../data-request/data-request-response-reconstruct';
export interface JSExecSettings { export interface JSExecSettings {
code: string; code: string;
@ -45,34 +42,19 @@ export interface JSExecSettings {
mode: WorkflowExecuteMode; mode: WorkflowExecuteMode;
} }
export interface PartialAdditionalData { export interface JsTaskData {
executionId?: string;
restartExecutionId?: string;
restApiUrl: string;
instanceBaseUrl: string;
formWaitingBaseUrl: string;
webhookBaseUrl: string;
webhookWaitingBaseUrl: string;
webhookTestBaseUrl: string;
currentNodeParameters?: INodeParameters;
executionTimeoutTimestamp?: number;
userId?: string;
variables: IDataObject;
}
export interface DataRequestResponse {
workflow: Omit<WorkflowParameters, 'nodeTypes'>; workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections; inputData: ITaskDataConnections;
connectionInputData: INodeExecutionData[];
node: INode; node: INode;
runExecutionData: IRunExecutionData; runExecutionData: IRunExecutionData;
runIndex: number; runIndex: number;
itemIndex: number; itemIndex: number;
activeNodeName: string; activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters; siblingParameters: INodeParameters;
mode: WorkflowExecuteMode; mode: WorkflowExecuteMode;
envProviderState?: EnvProviderState; envProviderState: EnvProviderState;
executeData?: IExecuteData; executeData?: IExecuteData;
defaultReturnRunIndex: number; defaultReturnRunIndex: number;
selfData: IDataObject; selfData: IDataObject;
@ -89,6 +71,8 @@ export class JsTaskRunner extends TaskRunner {
private readonly builtInsParser = new BuiltInsParser(); private readonly builtInsParser = new BuiltInsParser();
private readonly taskDataReconstruct = new DataRequestResponseReconstruct();
constructor(config: MainConfig, name = 'JS Task Runner') { constructor(config: MainConfig, name = 'JS Task Runner') {
super({ super({
taskType: 'javascript', taskType: 'javascript',
@ -115,33 +99,14 @@ export class JsTaskRunner extends TaskRunner {
? neededBuiltInsResult.result ? neededBuiltInsResult.result
: BuiltInsParserState.newNeedsAllDataState(); : BuiltInsParserState.newNeedsAllDataState();
const data = await this.requestData<DataRequestResponse>( const dataResponse = await this.requestData<DataRequestResponse>(
task.taskId, task.taskId,
neededBuiltIns.toDataRequestParams(), neededBuiltIns.toDataRequestParams(),
); );
/** const data = this.reconstructTaskData(dataResponse);
* We request node types only when we know a task needs all nodes, because
* needing all nodes means that the task relies on paired item functionality,
* which is the same requirement for needing node types.
*/
if (neededBuiltIns.needsAllNodes) {
const uniqueNodeTypes = new Map(
data.workflow.nodes.map((node) => [
`${node.type}|${node.typeVersion}`,
{ name: node.type, version: node.typeVersion },
]),
);
const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]); await this.requestNodeTypeIfNeeded(neededBuiltIns, data.workflow, task.taskId);
const nodeTypes = await this.requestNodeTypes<INodeTypeDescription[]>(
task.taskId,
unknownNodeTypes,
);
this.nodeTypes.addNodeTypeDescriptions(nodeTypes);
}
const workflowParams = data.workflow; const workflowParams = data.workflow;
const workflow = new Workflow({ const workflow = new Workflow({
@ -201,7 +166,7 @@ export class JsTaskRunner extends TaskRunner {
private async runForAllItems( private async runForAllItems(
taskId: string, taskId: string,
settings: JSExecSettings, settings: JSExecSettings,
data: DataRequestResponse, data: JsTaskData,
workflow: Workflow, workflow: Workflow,
customConsole: CustomConsole, customConsole: CustomConsole,
): Promise<INodeExecutionData[]> { ): Promise<INodeExecutionData[]> {
@ -248,7 +213,7 @@ export class JsTaskRunner extends TaskRunner {
private async runForEachItem( private async runForEachItem(
taskId: string, taskId: string,
settings: JSExecSettings, settings: JSExecSettings,
data: DataRequestResponse, data: JsTaskData,
workflow: Workflow, workflow: Workflow,
customConsole: CustomConsole, customConsole: CustomConsole,
): Promise<INodeExecutionData[]> { ): Promise<INodeExecutionData[]> {
@ -315,7 +280,7 @@ export class JsTaskRunner extends TaskRunner {
return returnData; return returnData;
} }
private createDataProxy(data: DataRequestResponse, workflow: Workflow, itemIndex: number) { private createDataProxy(data: JsTaskData, workflow: Workflow, itemIndex: number) {
return new WorkflowDataProxy( return new WorkflowDataProxy(
workflow, workflow,
data.runExecutionData, data.runExecutionData,
@ -359,4 +324,43 @@ export class JsTaskRunner extends TaskRunner {
return new ExecutionError({ message: JSON.stringify(error) }); return new ExecutionError({ message: JSON.stringify(error) });
} }
private reconstructTaskData(response: DataRequestResponse): JsTaskData {
return {
...response,
connectionInputData: this.taskDataReconstruct.reconstructConnectionInputData(
response.inputData,
),
executeData: this.taskDataReconstruct.reconstructExecuteData(response),
};
}
private async requestNodeTypeIfNeeded(
neededBuiltIns: BuiltInsParserState,
workflow: JsTaskData['workflow'],
taskId: string,
) {
/**
* We request node types only when we know a task needs all nodes, because
* needing all nodes means that the task relies on paired item functionality,
* which is the same requirement for needing node types.
*/
if (neededBuiltIns.needsAllNodes) {
const uniqueNodeTypes = new Map(
workflow.nodes.map((node) => [
`${node.type}|${node.typeVersion}`,
{ name: node.type, version: node.typeVersion },
]),
);
const unknownNodeTypes = this.nodeTypes.onlyUnknown([...uniqueNodeTypes.values()]);
const nodeTypes = await this.requestNodeTypes<INodeTypeDescription[]>(
taskId,
unknownNodeTypes,
);
this.nodeTypes.addNodeTypeDescriptions(nodeTypes);
}
}
} }

View file

@ -8,6 +8,7 @@ import type {
INodeParameters, INodeParameters,
IRunExecutionData, IRunExecutionData,
ITaskDataConnections, ITaskDataConnections,
ITaskDataConnectionsSource,
IWorkflowExecuteAdditionalData, IWorkflowExecuteAdditionalData,
Workflow, Workflow,
WorkflowExecuteMode, WorkflowExecuteMode,
@ -29,17 +30,16 @@ export interface TaskDataRequestParams {
export interface DataRequestResponse { export interface DataRequestResponse {
workflow: Omit<WorkflowParameters, 'nodeTypes'>; workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections; inputData: ITaskDataConnections;
connectionInputSource: ITaskDataConnectionsSource | null;
node: INode; node: INode;
runExecutionData: IRunExecutionData; runExecutionData: IRunExecutionData;
runIndex: number; runIndex: number;
itemIndex: number; itemIndex: number;
activeNodeName: string; activeNodeName: string;
connectionInputData: INodeExecutionData[];
siblingParameters: INodeParameters; siblingParameters: INodeParameters;
mode: WorkflowExecuteMode; mode: WorkflowExecuteMode;
envProviderState: EnvProviderState; envProviderState: EnvProviderState;
executeData?: IExecuteData;
defaultReturnRunIndex: number; defaultReturnRunIndex: number;
selfData: IDataObject; selfData: IDataObject;
contextNodeName: string; contextNodeName: string;

View file

@ -5,18 +5,6 @@ import type WebSocket from 'ws';
import type { TaskRunner } from './task-broker.service'; import type { TaskRunner } from './task-broker.service';
import type { AuthlessRequest } from '../requests'; import type { AuthlessRequest } from '../requests';
/**
* Specifies what data should be included for a task data request.
*/
export interface TaskDataRequestParams {
dataOfNodes: string[] | 'all';
prevNode: boolean;
/** Whether input data for the node should be included */
input: boolean;
/** Whether env provider's state should be included */
env: boolean;
}
export interface DisconnectAnalyzer { export interface DisconnectAnalyzer {
determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error>; determineDisconnectReason(runnerId: TaskRunner['id']): Promise<Error>;
} }

View file

@ -1,42 +1,10 @@
import type { TaskData } from '@n8n/task-runner'; import type { PartialAdditionalData, TaskData } from '@n8n/task-runner';
import { mock } from 'jest-mock-extended'; import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, IWorkflowExecuteAdditionalData } from 'n8n-workflow'; import type { Workflow } from 'n8n-workflow';
import { type INode, type INodeExecutionData, type Workflow } from 'n8n-workflow';
import { DataRequestResponseBuilder } from '../data-request-response-builder'; import { DataRequestResponseBuilder } from '../data-request-response-builder';
const triggerNode: INode = mock<INode>({ const additionalData = mock<PartialAdditionalData>({
name: 'Trigger',
});
const debugHelperNode: INode = mock<INode>({
name: 'DebugHelper',
});
const codeNode: INode = mock<INode>({
name: 'Code',
});
const workflow: TaskData['workflow'] = mock<Workflow>();
const debugHelperNodeOutItems: INodeExecutionData[] = [
{
json: {
uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032',
email: 'Dan.Schmidt31@yahoo.com',
firstname: 'Toni',
lastname: 'Schuster',
password: 'Q!D6C2',
},
pairedItem: {
item: 0,
},
},
];
const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems;
const connectionInputData: TaskData['connectionInputData'] = codeNodeInputItems;
const envProviderState: TaskData['envProviderState'] = mock<TaskData['envProviderState']>({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
const additionalData = mock<IWorkflowExecuteAdditionalData>({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting', formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/', instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest', restApiUrl: 'http://localhost:5678/rest',
@ -50,275 +18,57 @@ const additionalData = mock<IWorkflowExecuteAdditionalData>({
executionTimeoutTimestamp: undefined, executionTimeoutTimestamp: undefined,
restartExecutionId: undefined, restartExecutionId: undefined,
}); });
const executeFunctions = mock<IExecuteFunctions>();
/** const workflow: TaskData['workflow'] = mock<Workflow>({
* Drawn with https://asciiflow.com/#/ id: '1',
* Task data for an execution of the following WF: name: 'Test Workflow',
* where denotes the currently being executing node. active: true,
* connectionsBySourceNode: {},
* nodes: {},
* Trigger DebugHelper Code pinData: {},
* settings: {},
*/ staticData: {},
const taskData: TaskData = { });
executeFunctions,
workflow, const taskData = mock<TaskData>({
connectionInputData,
inputData: {
main: [codeNodeInputItems],
},
itemIndex: 0,
activeNodeName: codeNode.name,
contextNodeName: codeNode.name,
defaultReturnRunIndex: -1,
mode: 'manual',
envProviderState,
node: codeNode,
runExecutionData: {
startData: {
destinationNode: codeNode.name,
runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name],
},
resultData: {
runData: {
[triggerNode.name]: [
{
hints: [],
startTime: 1730313407328,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [[]],
},
},
],
[debugHelperNode.name]: [
{
hints: [],
startTime: 1730313407330,
executionTime: 1,
source: [
{
previousNode: triggerNode.name,
},
],
executionStatus: 'success',
data: {
main: [debugHelperNodeOutItems],
},
},
],
},
pinData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {
[codeNode.name]: {
'0': {
main: [codeNodeInputItems],
},
},
},
waitingExecutionSource: {
[codeNode.name]: {
'0': {
main: [
{
previousNode: debugHelperNode.name,
},
],
},
},
},
},
},
runIndex: 0,
selfData: {},
siblingParameters: {},
executeData: {
node: codeNode,
data: {
main: [codeNodeInputItems],
},
source: {
main: [
{
previousNode: debugHelperNode.name,
previousNodeOutput: 0,
},
],
},
},
additionalData, additionalData,
} as const; workflow,
});
describe('DataRequestResponseBuilder', () => { describe('DataRequestResponseBuilder', () => {
const allDataParam: DataRequestResponseBuilder['requestParams'] = { const builder = new DataRequestResponseBuilder();
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
};
const newRequestParam = (opts: Partial<DataRequestResponseBuilder['requestParams']>) => ({ it('picks only specific properties for additional data', () => {
...allDataParam, const result = builder.buildFromTaskData(taskData);
...opts,
});
describe('all data', () => { expect(result.additionalData).toStrictEqual({
it('should build the runExecutionData as is when everything is requested', () => { formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam); instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest',
const { runExecutionData } = dataRequestResponseBuilder.build(); variables: additionalData.variables,
webhookBaseUrl: 'http://localhost:5678/webhook',
expect(runExecutionData).toStrictEqual(taskData.runExecutionData); webhookTestBaseUrl: 'http://localhost:5678/webhook-test',
webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting',
executionId: '45844',
userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5',
currentNodeParameters: undefined,
executionTimeoutTimestamp: undefined,
restartExecutionId: undefined,
}); });
}); });
describe('envProviderState', () => { it('picks only specific properties for workflow', () => {
it("should filter out envProviderState when it's not requested", () => { const result = builder.buildFromTaskData(taskData);
const dataRequestResponseBuilder = new DataRequestResponseBuilder(
taskData,
newRequestParam({
env: false,
}),
);
const result = dataRequestResponseBuilder.build(); expect(result.workflow).toStrictEqual({
id: '1',
expect(result.envProviderState).toStrictEqual({ name: 'Test Workflow',
env: {}, active: true,
isEnvAccessBlocked: false, connections: workflow.connectionsBySourceNode,
isProcessAvailable: true, nodes: [],
}); pinData: workflow.pinData,
}); settings: workflow.settings,
}); staticData: workflow.staticData,
describe('additionalData', () => {
it('picks only specific properties for additional data', () => {
const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam);
const result = dataRequestResponseBuilder.build();
expect(result.additionalData).toStrictEqual({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest',
webhookBaseUrl: 'http://localhost:5678/webhook',
webhookTestBaseUrl: 'http://localhost:5678/webhook-test',
webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting',
executionId: '45844',
userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5',
currentNodeParameters: undefined,
executionTimeoutTimestamp: undefined,
restartExecutionId: undefined,
variables: additionalData.variables,
});
});
});
describe('input data', () => {
const allExceptInputParam = newRequestParam({
input: false,
});
it('drops input data from executeData', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.executeData).toStrictEqual({
node: taskData.executeData!.node,
source: taskData.executeData!.source,
data: {},
});
});
it('drops input data from result', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.inputData).toStrictEqual({});
});
it('drops input data from result', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.inputData).toStrictEqual({});
});
it('drops input data from connectionInputData', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.connectionInputData).toStrictEqual([]);
});
});
describe('nodes', () => {
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: ['Code'], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return only DebugHelper's data when only DebugHelper node is requested", () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return DebugHelper's data when only prevNode node is requested", () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [], prevNode: true }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
}); });
}); });
}); });

View file

@ -0,0 +1,300 @@
import type { DataRequestResponse, TaskDataRequestParams } from '@n8n/task-runner';
import { mock } from 'jest-mock-extended';
import type { IWorkflowExecuteAdditionalData } from 'n8n-workflow';
import { type INode, type INodeExecutionData } from 'n8n-workflow';
import { DataRequestResponseStripper } from '../data-request-response-stripper';
const triggerNode: INode = mock<INode>({
name: 'Trigger',
});
const debugHelperNode: INode = mock<INode>({
name: 'DebugHelper',
});
const codeNode: INode = mock<INode>({
name: 'Code',
});
const workflow: DataRequestResponse['workflow'] = mock<DataRequestResponse['workflow']>();
const debugHelperNodeOutItems: INodeExecutionData[] = [
{
json: {
uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032',
email: 'Dan.Schmidt31@yahoo.com',
firstname: 'Toni',
lastname: 'Schuster',
password: 'Q!D6C2',
},
pairedItem: {
item: 0,
},
},
];
const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems;
const envProviderState: DataRequestResponse['envProviderState'] = mock<
DataRequestResponse['envProviderState']
>({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
const additionalData = mock<IWorkflowExecuteAdditionalData>({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest',
variables: {},
webhookBaseUrl: 'http://localhost:5678/webhook',
webhookTestBaseUrl: 'http://localhost:5678/webhook-test',
webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting',
executionId: '45844',
userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5',
currentNodeParameters: undefined,
executionTimeoutTimestamp: undefined,
restartExecutionId: undefined,
});
/**
* Drawn with https://asciiflow.com/#/
* Task data for an execution of the following WF:
* where denotes the currently being executing node.
*
*
* Trigger DebugHelper Code
*
*/
const taskData: DataRequestResponse = {
workflow,
inputData: {
main: [codeNodeInputItems],
},
itemIndex: 0,
activeNodeName: codeNode.name,
contextNodeName: codeNode.name,
defaultReturnRunIndex: -1,
mode: 'manual',
envProviderState,
node: codeNode,
runExecutionData: {
startData: {
destinationNode: codeNode.name,
runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name],
},
resultData: {
runData: {
[triggerNode.name]: [
{
hints: [],
startTime: 1730313407328,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [[]],
},
},
],
[debugHelperNode.name]: [
{
hints: [],
startTime: 1730313407330,
executionTime: 1,
source: [
{
previousNode: triggerNode.name,
},
],
executionStatus: 'success',
data: {
main: [debugHelperNodeOutItems],
},
},
],
},
pinData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {
[codeNode.name]: {
'0': {
main: [codeNodeInputItems],
},
},
},
waitingExecutionSource: {
[codeNode.name]: {
'0': {
main: [
{
previousNode: debugHelperNode.name,
},
],
},
},
},
},
},
runIndex: 0,
selfData: {},
siblingParameters: {},
connectionInputSource: {
main: [
{
previousNode: debugHelperNode.name,
previousNodeOutput: 0,
},
],
},
additionalData,
} as const;
describe('DataRequestResponseStripper', () => {
const allDataParam: TaskDataRequestParams = {
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
};
const newRequestParam = (opts: Partial<TaskDataRequestParams>) => ({
...allDataParam,
...opts,
});
describe('all data', () => {
it('should build the runExecutionData as is when everything is requested', () => {
const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam);
const { runExecutionData } = dataRequestResponseBuilder.strip();
expect(runExecutionData).toStrictEqual(taskData.runExecutionData);
});
});
describe('envProviderState', () => {
it("should filter out envProviderState when it's not requested", () => {
const dataRequestResponseBuilder = new DataRequestResponseStripper(
taskData,
newRequestParam({
env: false,
}),
);
const result = dataRequestResponseBuilder.strip();
expect(result.envProviderState).toStrictEqual({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
});
});
describe('input data', () => {
const allExceptInputParam = newRequestParam({
input: false,
});
it('drops input data from result', () => {
const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip();
expect(result.inputData).toStrictEqual({});
});
it('drops input data from result', () => {
const result = new DataRequestResponseStripper(taskData, allExceptInputParam).strip();
expect(result.inputData).toStrictEqual({});
});
});
describe('nodes', () => {
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: ['Code'], prevNode: false }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return only DebugHelper's data when only DebugHelper node is requested", () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return DebugHelper's data when only prevNode node is requested", () => {
const result = new DataRequestResponseStripper(
taskData,
newRequestParam({ dataOfNodes: [], prevNode: true }),
).strip();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
});
describe('passthrough properties', () => {
test.each<Array<keyof DataRequestResponse>>([
['workflow'],
['connectionInputSource'],
['node'],
['runIndex'],
['itemIndex'],
['activeNodeName'],
['siblingParameters'],
['mode'],
['defaultReturnRunIndex'],
['selfData'],
['contextNodeName'],
['additionalData'],
])("it doesn't change %s", (propertyName) => {
const dataRequestResponseBuilder = new DataRequestResponseStripper(taskData, allDataParam);
const result = dataRequestResponseBuilder.strip();
expect(result[propertyName]).toBe(taskData[propertyName]);
});
});
});

View file

@ -1,63 +1,30 @@
import type { import type { DataRequestResponse, PartialAdditionalData, TaskData } from '@n8n/task-runner';
DataRequestResponse, import type { IWorkflowExecuteAdditionalData, Workflow, WorkflowParameters } from 'n8n-workflow';
BrokerMessage,
PartialAdditionalData,
TaskData,
} from '@n8n/task-runner';
import type {
EnvProviderState,
IExecuteData,
INodeExecutionData,
IPinData,
IRunData,
IRunExecutionData,
ITaskDataConnections,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowParameters,
} from 'n8n-workflow';
/** /**
* Builds the response to a data request coming from a Task Runner. Tries to minimize * Transforms TaskData to DataRequestResponse. The main purpose of the
* the amount of data that is sent to the runner by only providing what is requested. * transformation is to make sure there is no duplication in the data
* (e.g. connectionInputData and executeData.data can be derived from
* inputData).
*/ */
export class DataRequestResponseBuilder { export class DataRequestResponseBuilder {
private requestedNodeNames = new Set<string>(); buildFromTaskData(taskData: TaskData): DataRequestResponse {
constructor(
private readonly taskData: TaskData,
private readonly requestParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'],
) {
this.requestedNodeNames = new Set(requestParams.dataOfNodes);
if (this.requestParams.prevNode && this.requestParams.dataOfNodes !== 'all') {
this.requestedNodeNames.add(this.determinePrevNodeName());
}
}
/**
* Builds a response to the data request
*/
build(): DataRequestResponse {
const { taskData: td } = this;
return { return {
workflow: this.buildWorkflow(td.workflow), workflow: this.buildWorkflow(taskData.workflow),
connectionInputData: this.buildConnectionInputData(td.connectionInputData), inputData: taskData.inputData,
inputData: this.buildInputData(td.inputData), connectionInputSource: taskData.executeData?.source ?? null,
itemIndex: td.itemIndex, itemIndex: taskData.itemIndex,
activeNodeName: td.activeNodeName, activeNodeName: taskData.activeNodeName,
contextNodeName: td.contextNodeName, contextNodeName: taskData.contextNodeName,
defaultReturnRunIndex: td.defaultReturnRunIndex, defaultReturnRunIndex: taskData.defaultReturnRunIndex,
mode: td.mode, mode: taskData.mode,
envProviderState: this.buildEnvProviderState(td.envProviderState), envProviderState: taskData.envProviderState,
node: td.node, // The current node being executed node: taskData.node,
runExecutionData: this.buildRunExecutionData(td.runExecutionData), runExecutionData: taskData.runExecutionData,
runIndex: td.runIndex, runIndex: taskData.runIndex,
selfData: td.selfData, selfData: taskData.selfData,
siblingParameters: td.siblingParameters, siblingParameters: taskData.siblingParameters,
executeData: this.buildExecuteData(td.executeData), additionalData: this.buildAdditionalData(taskData.additionalData),
additionalData: this.buildAdditionalData(td.additionalData),
}; };
} }
@ -80,86 +47,6 @@ export class DataRequestResponseBuilder {
}; };
} }
private buildExecuteData(executeData: IExecuteData | undefined): IExecuteData | undefined {
if (executeData === undefined) {
return undefined;
}
return {
node: executeData.node, // The current node being executed
data: this.requestParams.input ? executeData.data : {},
source: executeData.source,
};
}
private buildRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData {
if (this.requestParams.dataOfNodes === 'all') {
return runExecutionData;
}
return {
startData: runExecutionData.startData,
resultData: {
error: runExecutionData.resultData.error,
lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted,
metadata: runExecutionData.resultData.metadata,
runData: this.buildRunData(runExecutionData.resultData.runData),
pinData: this.buildPinData(runExecutionData.resultData.pinData),
},
executionData: runExecutionData.executionData
? {
// TODO: Figure out what these two are and can they be filtered
contextData: runExecutionData.executionData?.contextData,
nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack,
metadata: runExecutionData.executionData.metadata,
waitingExecution: runExecutionData.executionData.waitingExecution,
waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource,
}
: undefined,
};
}
private buildRunData(runData: IRunData): IRunData {
return this.filterObjectByNodeNames(runData);
}
private buildPinData(pinData: IPinData | undefined): IPinData | undefined {
return pinData ? this.filterObjectByNodeNames(pinData) : undefined;
}
private buildEnvProviderState(envProviderState: EnvProviderState): EnvProviderState {
if (this.requestParams.env) {
// In case `isEnvAccessBlocked` = true, the provider state has already sanitized
// the environment variables and we can return it as is.
return envProviderState;
}
return {
env: {},
isEnvAccessBlocked: envProviderState.isEnvAccessBlocked,
isProcessAvailable: envProviderState.isProcessAvailable,
};
}
private buildInputData(inputData: ITaskDataConnections): ITaskDataConnections {
if (this.requestParams.input) {
return inputData;
}
return {};
}
private buildConnectionInputData(
connectionInputData: INodeExecutionData[],
): INodeExecutionData[] {
if (this.requestParams.input) {
return connectionInputData;
}
return [];
}
private buildWorkflow(workflow: Workflow): Omit<WorkflowParameters, 'nodeTypes'> { private buildWorkflow(workflow: Workflow): Omit<WorkflowParameters, 'nodeTypes'> {
return { return {
id: workflow.id, id: workflow.id,
@ -172,37 +59,4 @@ export class DataRequestResponseBuilder {
staticData: workflow.staticData, staticData: workflow.staticData,
}; };
} }
/**
* Assuming the given `obj` is an object where the keys are node names,
* filters the object to only include the node names that are requested.
*/
private filterObjectByNodeNames<T extends Record<string, unknown>>(obj: T): T {
if (this.requestParams.dataOfNodes === 'all') {
return obj;
}
const filteredObj: T = {} as T;
for (const nodeName in obj) {
if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) {
continue;
}
if (this.requestedNodeNames.has(nodeName)) {
filteredObj[nodeName] = obj[nodeName];
}
}
return filteredObj;
}
private determinePrevNodeName(): string {
const sourceData = this.taskData.executeData?.source?.main?.[0];
if (!sourceData) {
return '';
}
return sourceData.previousNode;
}
} }

View file

@ -0,0 +1,131 @@
import type { DataRequestResponse, BrokerMessage } from '@n8n/task-runner';
import type {
EnvProviderState,
IPinData,
IRunData,
IRunExecutionData,
ITaskDataConnections,
} from 'n8n-workflow';
/**
* Strips data from data request response based on the specified parameters
*/
export class DataRequestResponseStripper {
private requestedNodeNames = new Set<string>();
constructor(
private readonly dataResponse: DataRequestResponse,
private readonly stripParams: BrokerMessage.ToRequester.TaskDataRequest['requestParams'],
) {
this.requestedNodeNames = new Set(stripParams.dataOfNodes);
if (this.stripParams.prevNode && this.stripParams.dataOfNodes !== 'all') {
this.requestedNodeNames.add(this.determinePrevNodeName());
}
}
/**
* Builds a response to the data request
*/
strip(): DataRequestResponse {
const { dataResponse: dr } = this;
return {
...dr,
inputData: this.stripInputData(dr.inputData),
envProviderState: this.stripEnvProviderState(dr.envProviderState),
runExecutionData: this.stripRunExecutionData(dr.runExecutionData),
};
}
private stripRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData {
if (this.stripParams.dataOfNodes === 'all') {
return runExecutionData;
}
return {
startData: runExecutionData.startData,
resultData: {
error: runExecutionData.resultData.error,
lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted,
metadata: runExecutionData.resultData.metadata,
runData: this.stripRunData(runExecutionData.resultData.runData),
pinData: this.stripPinData(runExecutionData.resultData.pinData),
},
executionData: runExecutionData.executionData
? {
// TODO: Figure out what these two are and can they be stripped
contextData: runExecutionData.executionData?.contextData,
nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack,
metadata: runExecutionData.executionData.metadata,
waitingExecution: runExecutionData.executionData.waitingExecution,
waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource,
}
: undefined,
};
}
private stripRunData(runData: IRunData): IRunData {
return this.filterObjectByNodeNames(runData);
}
private stripPinData(pinData: IPinData | undefined): IPinData | undefined {
return pinData ? this.filterObjectByNodeNames(pinData) : undefined;
}
private stripEnvProviderState(envProviderState: EnvProviderState): EnvProviderState {
if (this.stripParams.env) {
// In case `isEnvAccessBlocked` = true, the provider state has already sanitized
// the environment variables and we can return it as is.
return envProviderState;
}
return {
env: {},
isEnvAccessBlocked: envProviderState.isEnvAccessBlocked,
isProcessAvailable: envProviderState.isProcessAvailable,
};
}
private stripInputData(inputData: ITaskDataConnections): ITaskDataConnections {
if (this.stripParams.input) {
return inputData;
}
return {};
}
/**
* Assuming the given `obj` is an object where the keys are node names,
* filters the object to only include the node names that are requested.
*/
private filterObjectByNodeNames<T extends Record<string, unknown>>(obj: T): T {
if (this.stripParams.dataOfNodes === 'all') {
return obj;
}
const filteredObj: T = {} as T;
for (const nodeName in obj) {
if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) {
continue;
}
if (this.requestedNodeNames.has(nodeName)) {
filteredObj[nodeName] = obj[nodeName];
}
}
return filteredObj;
}
private determinePrevNodeName(): string {
const sourceData = this.dataResponse.connectionInputSource?.main?.[0];
if (!sourceData) {
return '';
}
return sourceData.previousNode;
}
}

View file

@ -1,5 +1,6 @@
import { TaskRunnersConfig } from '@n8n/config';
import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner'; import type { TaskResultData, RequesterMessage, BrokerMessage, TaskData } from '@n8n/task-runner';
import { RPC_ALLOW_LIST } from '@n8n/task-runner'; import { DataRequestResponseReconstruct, RPC_ALLOW_LIST } from '@n8n/task-runner';
import type { import type {
EnvProviderState, EnvProviderState,
IExecuteFunctions, IExecuteFunctions,
@ -17,11 +18,13 @@ import type {
} from 'n8n-workflow'; } from 'n8n-workflow';
import { createResultOk, createResultError } from 'n8n-workflow'; import { createResultOk, createResultError } from 'n8n-workflow';
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import { Service } from 'typedi'; import * as a from 'node:assert/strict';
import Container, { Service } from 'typedi';
import { NodeTypes } from '@/node-types'; import { NodeTypes } from '@/node-types';
import { DataRequestResponseBuilder } from './data-request-response-builder'; import { DataRequestResponseBuilder } from './data-request-response-builder';
import { DataRequestResponseStripper } from './data-request-response-stripper';
export type RequestAccept = (jobId: string) => void; export type RequestAccept = (jobId: string) => void;
export type RequestReject = (reason: string) => void; export type RequestReject = (reason: string) => void;
@ -56,6 +59,10 @@ export abstract class TaskManager {
tasks: Map<string, Task> = new Map(); tasks: Map<string, Task> = new Map();
private readonly runnerConfig = Container.get(TaskRunnersConfig);
private readonly dataResponseBuilder = new DataRequestResponseBuilder();
constructor(private readonly nodeTypes: NodeTypes) {} constructor(private readonly nodeTypes: NodeTypes) {}
async startTask<TData, TError>( async startTask<TData, TError>(
@ -237,14 +244,30 @@ export abstract class TaskManager {
return; return;
} }
const dataRequestResponseBuilder = new DataRequestResponseBuilder(job.data, requestParams); const dataRequestResponse = this.dataResponseBuilder.buildFromTaskData(job.data);
const requestedData = dataRequestResponseBuilder.build();
if (this.runnerConfig.assertDeduplicationOutput) {
const reconstruct = new DataRequestResponseReconstruct();
a.deepStrictEqual(
reconstruct.reconstructConnectionInputData(dataRequestResponse.inputData),
job.data.connectionInputData,
);
a.deepStrictEqual(
reconstruct.reconstructExecuteData(dataRequestResponse),
job.data.executeData,
);
}
const strippedData = new DataRequestResponseStripper(
dataRequestResponse,
requestParams,
).strip();
this.sendMessage({ this.sendMessage({
type: 'requester:taskdataresponse', type: 'requester:taskdataresponse',
taskId, taskId,
requestId, requestId,
data: requestedData, data: strippedData,
}); });
} }