feat: Only send needed data to task runner (no-changelog) (#11487)

This commit is contained in:
Tomi Turtiainen 2024-11-04 11:13:09 +02:00 committed by GitHub
parent 2104fa1733
commit e4aa1d01f3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 1511 additions and 252 deletions

View file

@ -23,8 +23,10 @@
],
"dependencies": {
"@n8n/config": "workspace:*",
"n8n-workflow": "workspace:*",
"acorn": "8.14.0",
"acorn-walk": "8.3.4",
"n8n-core": "workspace:*",
"n8n-workflow": "workspace:*",
"nanoid": "^3.3.6",
"typedi": "catalog:",
"ws": "^8.18.0"

View file

@ -4,14 +4,11 @@ import fs from 'node:fs';
import { builtinModules } from 'node:module';
import { ValidationError } from '@/js-task-runner/errors/validation-error';
import {
JsTaskRunner,
type AllCodeTaskData,
type JSExecSettings,
} from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner';
import { JsTaskRunner } from '@/js-task-runner/js-task-runner';
import type { Task } from '@/task-runner';
import { newAllCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data';
import { newCodeTaskData, newTaskWithSettings, withPairedItem, wrapIntoJson } from './test-data';
import type { JsRunnerConfig } from '../../config/js-runner-config';
import { MainConfig } from '../../config/main-config';
import { ExecutionError } from '../errors/execution-error';
@ -43,7 +40,7 @@ describe('JsTaskRunner', () => {
runner = defaultTaskRunner,
}: {
task: Task<JSExecSettings>;
taskData: AllCodeTaskData;
taskData: DataRequestResponse;
runner?: JsTaskRunner;
}) => {
jest.spyOn(runner, 'requestData').mockResolvedValue(taskData);
@ -71,7 +68,7 @@ describe('JsTaskRunner', () => {
nodeMode: 'runOnceForAllItems',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson)),
runner,
});
};
@ -94,7 +91,7 @@ describe('JsTaskRunner', () => {
nodeMode: 'runOnceForEachItem',
...settings,
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson)),
taskData: newCodeTaskData(inputItems.map(wrapIntoJson)),
runner,
});
};
@ -111,7 +108,7 @@ describe('JsTaskRunner', () => {
await execTaskWithParams({
task,
taskData: newAllCodeTaskData([wrapIntoJson({})]),
taskData: newCodeTaskData([wrapIntoJson({})]),
});
expect(defaultTaskRunner.makeRpcCall).toHaveBeenCalledWith(task.taskId, 'logNodeOutput', [
@ -246,7 +243,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: false,
isProcessAvailable: true,
@ -265,7 +262,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.VAR1 }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: true,
isProcessAvailable: true,
@ -282,7 +279,7 @@ describe('JsTaskRunner', () => {
code: 'return Object.values($env).concat(Object.keys($env))',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: {
isEnvAccessBlocked: false,
isProcessAvailable: true,
@ -301,7 +298,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: $env.N8N_RUNNERS_N8N_URI }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: undefined,
}),
});
@ -316,7 +313,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForAllItems',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: undefined,
}),
});
@ -328,7 +325,7 @@ describe('JsTaskRunner', () => {
code: 'return { val: Buffer.from("test-buffer").toString() }',
nodeMode: 'runOnceForEachItem',
}),
taskData: newAllCodeTaskData(inputItems.map(wrapIntoJson), {
taskData: newCodeTaskData(inputItems.map(wrapIntoJson), {
envProviderState: undefined,
}),
});
@ -774,7 +771,7 @@ describe('JsTaskRunner', () => {
code: 'unknown',
nodeMode,
}),
taskData: newAllCodeTaskData([wrapIntoJson({ a: 1 })]),
taskData: newCodeTaskData([wrapIntoJson({ a: 1 })]),
}),
).rejects.toThrow(ExecutionError);
},
@ -796,7 +793,7 @@ describe('JsTaskRunner', () => {
jest.spyOn(runner, 'sendOffers').mockImplementation(() => {});
jest
.spyOn(runner, 'requestData')
.mockResolvedValue(newAllCodeTaskData([wrapIntoJson({ a: 1 })]));
.mockResolvedValue(newCodeTaskData([wrapIntoJson({ a: 1 })]));
await runner.receivedSettings(taskId, task.settings);

View file

@ -2,7 +2,7 @@ import type { IDataObject, INode, INodeExecutionData, ITaskData } from 'n8n-work
import { NodeConnectionType } from 'n8n-workflow';
import { nanoid } from 'nanoid';
import type { AllCodeTaskData, JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { DataRequestResponse, JSExecSettings } from '@/js-task-runner/js-task-runner';
import type { Task } from '@/task-runner';
/**
@ -48,10 +48,10 @@ export const newTaskData = (opts: Partial<ITaskData> & Pick<ITaskData, 'source'>
/**
* Creates a new all code task data with the given options
*/
export const newAllCodeTaskData = (
export const newCodeTaskData = (
codeNodeInputData: INodeExecutionData[],
opts: Partial<AllCodeTaskData> = {},
): AllCodeTaskData => {
opts: Partial<DataRequestResponse> = {},
): DataRequestResponse => {
const codeNode = newNode({
name: 'JsCode',
parameters: {

View file

@ -0,0 +1,117 @@
import { BuiltInsParserState } from '../built-ins-parser-state';
describe('BuiltInsParserState', () => {
describe('toDataRequestSpecification', () => {
it('should return empty array when no properties are marked as needed', () => {
const state = new BuiltInsParserState();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: false,
input: false,
prevNode: false,
});
});
it('should return all nodes and input when markNeedsAllNodes is called', () => {
const state = new BuiltInsParserState();
state.markNeedsAllNodes();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: false,
input: true,
prevNode: false,
});
});
it('should return specific node names when nodes are marked as needed individually', () => {
const state = new BuiltInsParserState();
state.markNodeAsNeeded('Node1');
state.markNodeAsNeeded('Node2');
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: ['Node1', 'Node2'],
env: false,
input: false,
prevNode: false,
});
});
it('should ignore individual nodes when needsAllNodes is marked as true', () => {
const state = new BuiltInsParserState();
state.markNodeAsNeeded('Node1');
state.markNeedsAllNodes();
state.markNodeAsNeeded('Node2'); // should be ignored since all nodes are needed
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: false,
input: true,
prevNode: false,
});
});
it('should mark env as needed when markEnvAsNeeded is called', () => {
const state = new BuiltInsParserState();
state.markEnvAsNeeded();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: true,
input: false,
prevNode: false,
});
});
it('should mark input as needed when markInputAsNeeded is called', () => {
const state = new BuiltInsParserState();
state.markInputAsNeeded();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: false,
input: true,
prevNode: false,
});
});
it('should mark prevNode as needed when markPrevNodeAsNeeded is called', () => {
const state = new BuiltInsParserState();
state.markPrevNodeAsNeeded();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: [],
env: false,
input: false,
prevNode: true,
});
});
it('should return correct specification when multiple properties are marked as needed', () => {
const state = new BuiltInsParserState();
state.markNeedsAllNodes();
state.markEnvAsNeeded();
state.markInputAsNeeded();
state.markPrevNodeAsNeeded();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
});
});
it('should return correct specification when all properties are marked as needed', () => {
const state = BuiltInsParserState.newNeedsAllDataState();
expect(state.toDataRequestParams()).toEqual({
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
});
});
});
});

View file

@ -0,0 +1,251 @@
import { getAdditionalKeys } from 'n8n-core';
import type { IDataObject, INodeType, IWorkflowExecuteAdditionalData } from 'n8n-workflow';
import { Workflow, WorkflowDataProxy } from 'n8n-workflow';
import { newCodeTaskData } from '../../__tests__/test-data';
import { BuiltInsParser } from '../built-ins-parser';
import { BuiltInsParserState } from '../built-ins-parser-state';
describe('BuiltInsParser', () => {
const parser = new BuiltInsParser();
const parseAndExpectOk = (code: string) => {
const result = parser.parseUsedBuiltIns(code);
if (!result.ok) {
fail(result.error);
}
return result.result;
};
describe('Env, input, execution and prevNode', () => {
const cases: Array<[string, BuiltInsParserState]> = [
['$env', new BuiltInsParserState({ needs$env: true })],
['$execution', new BuiltInsParserState({ needs$execution: true })],
['$prevNode', new BuiltInsParserState({ needs$prevNode: true })],
];
test.each(cases)("should identify built-ins in '%s'", (code, expected) => {
const state = parseAndExpectOk(code);
expect(state).toEqual(expected);
});
});
describe('Input', () => {
it('should mark input as needed when $input is used', () => {
const state = parseAndExpectOk(`
$input.item.json.age = 10 + Math.floor(Math.random() * 30);
$input.item.json.password = $input.item.json.password.split('').map(() => '*').join("")
delete $input.item.json.lastname
const emailParts = $input.item.json.email.split("@")
$input.item.json.emailData = {
user: emailParts[0],
domain: emailParts[1]
}
return $input.item;
`);
expect(state).toEqual(new BuiltInsParserState({ needs$input: true }));
});
it('should mark input as needed when $json is used', () => {
const state = parseAndExpectOk(`
$json.age = 10 + Math.floor(Math.random() * 30);
return $json;
`);
expect(state).toEqual(new BuiltInsParserState({ needs$input: true }));
});
});
describe('$(...)', () => {
const cases: Array<[string, BuiltInsParserState]> = [
[
'$("nodeName").first()',
new BuiltInsParserState({ neededNodeNames: new Set(['nodeName']) }),
],
[
'$("nodeName").all(); $("secondNode").matchingItem()',
new BuiltInsParserState({ neededNodeNames: new Set(['nodeName', 'secondNode']) }),
],
];
test.each(cases)("should identify nodes in '%s'", (code, expected) => {
const state = parseAndExpectOk(code);
expect(state).toEqual(expected);
});
it('should need all nodes when $() is called with a variable', () => {
const state = parseAndExpectOk('var n = "name"; $(n)');
expect(state).toEqual(new BuiltInsParserState({ needsAllNodes: true, needs$input: true }));
});
it('should require all nodes when there are multiple usages of $() and one is with a variable', () => {
const state = parseAndExpectOk(`
$("nodeName");
$("secondNode");
var n = "name";
$(n)
`);
expect(state).toEqual(new BuiltInsParserState({ needsAllNodes: true, needs$input: true }));
});
test.each([
['without parameters', '$()'],
['number literal', '$(123)'],
])('should ignore when $ is called %s', (_, code) => {
const state = parseAndExpectOk(code);
expect(state).toEqual(new BuiltInsParserState());
});
test.each([
'$("node").item',
'$("node")["item"]',
'$("node").pairedItem()',
'$("node")["pairedItem"]()',
'$("node").itemMatching(0)',
'$("node")["itemMatching"](0)',
'$("node")[variable]',
'var a = $("node")',
'let a = $("node")',
'const a = $("node")',
'a = $("node")',
])('should require all nodes if %s is used', (code) => {
const state = parseAndExpectOk(code);
expect(state).toEqual(new BuiltInsParserState({ needsAllNodes: true, needs$input: true }));
});
test.each(['$("node").first()', '$("node").last()', '$("node").all()', '$("node").params'])(
'should require only accessed node if %s is used',
(code) => {
const state = parseAndExpectOk(code);
expect(state).toEqual(
new BuiltInsParserState({
needsAllNodes: false,
neededNodeNames: new Set(['node']),
}),
);
},
);
});
describe('ECMAScript syntax', () => {
describe('ES2020', () => {
it('should parse optional chaining', () => {
parseAndExpectOk(`
const a = { b: { c: 1 } };
return a.b?.c;
`);
});
it('should parse nullish coalescing', () => {
parseAndExpectOk(`
const a = null;
return a ?? 1;
`);
});
});
describe('ES2021', () => {
it('should parse numeric separators', () => {
parseAndExpectOk(`
const a = 1_000_000;
return a;
`);
});
});
});
describe('WorkflowDataProxy built-ins', () => {
it('should have a known list of built-ins', () => {
const data = newCodeTaskData([]);
const dataProxy = new WorkflowDataProxy(
new Workflow({
...data.workflow,
nodeTypes: {
getByName() {
return undefined as unknown as INodeType;
},
getByNameAndVersion() {
return undefined as unknown as INodeType;
},
getKnownTypes() {
return undefined as unknown as IDataObject;
},
},
}),
data.runExecutionData,
data.runIndex,
0,
data.activeNodeName,
data.connectionInputData,
data.siblingParameters,
data.mode,
getAdditionalKeys(
data.additionalData as IWorkflowExecuteAdditionalData,
data.mode,
data.runExecutionData,
),
data.executeData,
data.defaultReturnRunIndex,
data.selfData,
data.contextNodeName,
// Make sure that even if we don't receive the envProviderState for
// whatever reason, we don't expose the task runner's env to the code
data.envProviderState ?? {
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
},
).getDataProxy({ throwOnMissingExecutionData: false });
/**
* NOTE! If you are adding new built-ins to the WorkflowDataProxy class
* make sure the built-ins parser and Task Runner handle them properly.
*/
expect(Object.keys(dataProxy)).toStrictEqual([
'$',
'$input',
'$binary',
'$data',
'$env',
'$evaluateExpression',
'$item',
'$fromAI',
'$fromai',
'$fromAi',
'$items',
'$json',
'$node',
'$self',
'$parameter',
'$prevNode',
'$runIndex',
'$mode',
'$workflow',
'$itemIndex',
'$now',
'$today',
'$jmesPath',
'DateTime',
'Interval',
'Duration',
'$execution',
'$vars',
'$secrets',
'$executionId',
'$resumeWebhookUrl',
'$getPairedItem',
'$jmespath',
'$position',
'$thisItem',
'$thisItemIndex',
'$thisRunIndex',
'$nodeVersion',
'$nodeId',
'$webhookId',
]);
});
});
});

View file

@ -0,0 +1,28 @@
import type {
AssignmentExpression,
Identifier,
Literal,
MemberExpression,
Node,
VariableDeclarator,
} from 'acorn';
export function isLiteral(node?: Node): node is Literal {
return node?.type === 'Literal';
}
export function isIdentifier(node?: Node): node is Identifier {
return node?.type === 'Identifier';
}
export function isMemberExpression(node?: Node): node is MemberExpression {
return node?.type === 'MemberExpression';
}
export function isVariableDeclarator(node?: Node): node is VariableDeclarator {
return node?.type === 'VariableDeclarator';
}
export function isAssignmentExpression(node?: Node): node is AssignmentExpression {
return node?.type === 'AssignmentExpression';
}

View file

@ -0,0 +1,74 @@
import type { N8nMessage } from '../../runner-types';
/**
* Class to keep track of which built-in variables are accessed in the code
*/
export class BuiltInsParserState {
neededNodeNames: Set<string> = new Set();
needsAllNodes = false;
needs$env = false;
needs$input = false;
needs$execution = false;
needs$prevNode = false;
constructor(opts: Partial<BuiltInsParserState> = {}) {
Object.assign(this, opts);
}
/**
* Marks that all nodes are needed, including input data
*/
markNeedsAllNodes() {
this.needsAllNodes = true;
this.needs$input = true;
this.neededNodeNames = new Set();
}
markNodeAsNeeded(nodeName: string) {
if (this.needsAllNodes) {
return;
}
this.neededNodeNames.add(nodeName);
}
markEnvAsNeeded() {
this.needs$env = true;
}
markInputAsNeeded() {
this.needs$input = true;
}
markExecutionAsNeeded() {
this.needs$execution = true;
}
markPrevNodeAsNeeded() {
this.needs$prevNode = true;
}
toDataRequestParams(): N8nMessage.ToRequester.TaskDataRequest['requestParams'] {
return {
dataOfNodes: this.needsAllNodes ? 'all' : Array.from(this.neededNodeNames),
env: this.needs$env,
input: this.needs$input,
prevNode: this.needs$prevNode,
};
}
static newNeedsAllDataState() {
const obj = new BuiltInsParserState();
obj.markNeedsAllNodes();
obj.markEnvAsNeeded();
obj.markInputAsNeeded();
obj.markExecutionAsNeeded();
obj.markPrevNodeAsNeeded();
return obj;
}
}

View file

@ -0,0 +1,142 @@
import type { CallExpression, Identifier, Node, Program } from 'acorn';
import { parse } from 'acorn';
import { ancestor } from 'acorn-walk';
import type { Result } from 'n8n-workflow';
import { toResult } from 'n8n-workflow';
import {
isAssignmentExpression,
isIdentifier,
isLiteral,
isMemberExpression,
isVariableDeclarator,
} from './acorn-helpers';
import { BuiltInsParserState } from './built-ins-parser-state';
/**
* Class for parsing Code Node code to identify which built-in variables
* are accessed
*/
export class BuiltInsParser {
/**
* Parses which built-in variables are accessed in the given code
*/
public parseUsedBuiltIns(code: string): Result<BuiltInsParserState, Error> {
return toResult(() => {
const wrappedCode = `async function VmCodeWrapper() { ${code} }`;
const ast = parse(wrappedCode, { ecmaVersion: 2025, sourceType: 'module' });
return this.identifyBuiltInsByWalkingAst(ast);
});
}
/** Traverse the AST of the script and mark any data needed for it to run. */
private identifyBuiltInsByWalkingAst(ast: Program) {
const accessedBuiltIns = new BuiltInsParserState();
ancestor(
ast,
{
CallExpression: this.visitCallExpression,
Identifier: this.visitIdentifier,
},
undefined,
accessedBuiltIns,
);
return accessedBuiltIns;
}
private visitCallExpression = (
node: CallExpression,
state: BuiltInsParserState,
ancestors: Node[],
) => {
// $(...)
const isDollar = node.callee.type === 'Identifier' && node.callee.name === '$';
if (!isDollar) return;
// $(): This is not valid, ignore
if (node.arguments.length === 0) {
return;
}
const firstArg = node.arguments[0];
if (!isLiteral(firstArg)) {
// $(variable): Can't easily determine statically, mark all nodes as needed
state.markNeedsAllNodes();
return;
}
if (typeof firstArg.value !== 'string') {
// $(123): Static value, but not a string --> invalid code --> ignore
return;
}
// $("node"): Static value, mark 'nodeName' as needed
state.markNodeAsNeeded(firstArg.value);
// Determine how $("node") is used
this.handlePrevNodeCall(node, state, ancestors);
};
private handlePrevNodeCall(_node: CallExpression, state: BuiltInsParserState, ancestors: Node[]) {
// $("node").item, .pairedItem or .itemMatching: In a case like this, the execution
// engine will traverse back from current node (i.e. the Code Node) to
// the "node" node and use `pairedItem`s to find which item is linked
// to the current item. So, we need to mark all nodes as needed.
// TODO: We could also mark all the nodes between the current node and
// the "node" node as needed, but that would require more complex logic.
const directParent = ancestors[ancestors.length - 2];
if (isMemberExpression(directParent)) {
const accessedProperty = directParent.property;
if (directParent.computed) {
// $("node")["item"], ["pairedItem"] or ["itemMatching"]
if (isLiteral(accessedProperty)) {
if (this.isPairedItemProperty(accessedProperty.value)) {
state.markNeedsAllNodes();
}
// Else: $("node")[123]: Static value, but not any of the ones above --> ignore
}
// $("node")[variable]
else if (isIdentifier(accessedProperty)) {
state.markNeedsAllNodes();
}
}
// $("node").item, .pairedItem or .itemMatching
else if (isIdentifier(accessedProperty) && this.isPairedItemProperty(accessedProperty.name)) {
state.markNeedsAllNodes();
}
} else if (isVariableDeclarator(directParent) || isAssignmentExpression(directParent)) {
// const variable = $("node") or variable = $("node"):
// In this case we would need to track down all the possible use sites
// of 'variable' and determine if `.item` is accessed on it. This is
// more complex and skipped for now.
// TODO: Optimize for this case
state.markNeedsAllNodes();
} else {
// Something else than the cases above. Mark all nodes as needed as it
// could be a dynamic access.
state.markNeedsAllNodes();
}
}
private visitIdentifier = (node: Identifier, state: BuiltInsParserState) => {
if (node.name === '$env') {
state.markEnvAsNeeded();
} else if (node.name === '$input' || node.name === '$json') {
state.markInputAsNeeded();
} else if (node.name === '$execution') {
state.markExecutionAsNeeded();
} else if (node.name === '$prevNode') {
state.markPrevNodeAsNeeded();
}
};
private isPairedItemProperty(
property?: string | boolean | null | number | RegExp | bigint,
): boolean {
return property === 'item' || property === 'pairedItem' || property === 'itemMatching';
}
}

View file

@ -24,6 +24,8 @@ import { runInNewContext, type Context } from 'node:vm';
import type { TaskResultData } from '@/runner-types';
import { type Task, TaskRunner } from '@/task-runner';
import { BuiltInsParser } from './built-ins-parser/built-ins-parser';
import { BuiltInsParserState } from './built-ins-parser/built-ins-parser-state';
import { isErrorLike } from './errors/error-like';
import { ExecutionError } from './errors/execution-error';
import { makeSerializable } from './errors/serializable-error';
@ -57,7 +59,7 @@ export interface PartialAdditionalData {
variables: IDataObject;
}
export interface AllCodeTaskData {
export interface DataRequestResponse {
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections;
node: INode;
@ -84,6 +86,8 @@ type CustomConsole = {
export class JsTaskRunner extends TaskRunner {
private readonly requireResolver: RequireResolver;
private readonly builtInsParser = new BuiltInsParser();
constructor(config: MainConfig, name = 'JS Task Runner') {
super({
taskType: 'javascript',
@ -102,12 +106,20 @@ export class JsTaskRunner extends TaskRunner {
}
async executeTask(task: Task<JSExecSettings>): Promise<TaskResultData> {
const allData = await this.requestData<AllCodeTaskData>(task.taskId, 'all');
const settings = task.settings;
a.ok(settings, 'JS Code not sent to runner');
const workflowParams = allData.workflow;
const neededBuiltInsResult = this.builtInsParser.parseUsedBuiltIns(settings.code);
const neededBuiltIns = neededBuiltInsResult.ok
? neededBuiltInsResult.result
: BuiltInsParserState.newNeedsAllDataState();
const data = await this.requestData<DataRequestResponse>(
task.taskId,
neededBuiltIns.toDataRequestParams(),
);
const workflowParams = data.workflow;
const workflow = new Workflow({
...workflowParams,
nodeTypes: this.nodeTypes,
@ -126,12 +138,12 @@ export class JsTaskRunner extends TaskRunner {
const result =
settings.nodeMode === 'runOnceForAllItems'
? await this.runForAllItems(task.taskId, settings, allData, workflow, customConsole)
: await this.runForEachItem(task.taskId, settings, allData, workflow, customConsole);
? await this.runForAllItems(task.taskId, settings, data, workflow, customConsole)
: await this.runForEachItem(task.taskId, settings, data, workflow, customConsole);
return {
result,
customData: allData.runExecutionData.resultData.metadata,
customData: data.runExecutionData.resultData.metadata,
};
}
@ -165,12 +177,12 @@ export class JsTaskRunner extends TaskRunner {
private async runForAllItems(
taskId: string,
settings: JSExecSettings,
allData: AllCodeTaskData,
data: DataRequestResponse,
workflow: Workflow,
customConsole: CustomConsole,
): Promise<INodeExecutionData[]> {
const dataProxy = this.createDataProxy(allData, workflow, allData.itemIndex);
const inputItems = allData.connectionInputData;
const dataProxy = this.createDataProxy(data, workflow, data.itemIndex);
const inputItems = data.connectionInputData;
const context: Context = {
require: this.requireResolver,
@ -212,16 +224,16 @@ export class JsTaskRunner extends TaskRunner {
private async runForEachItem(
taskId: string,
settings: JSExecSettings,
allData: AllCodeTaskData,
data: DataRequestResponse,
workflow: Workflow,
customConsole: CustomConsole,
): Promise<INodeExecutionData[]> {
const inputItems = allData.connectionInputData;
const inputItems = data.connectionInputData;
const returnData: INodeExecutionData[] = [];
for (let index = 0; index < inputItems.length; index++) {
const item = inputItems[index];
const dataProxy = this.createDataProxy(allData, workflow, index);
const dataProxy = this.createDataProxy(data, workflow, index);
const context: Context = {
require: this.requireResolver,
module: {},
@ -279,33 +291,37 @@ export class JsTaskRunner extends TaskRunner {
return returnData;
}
private createDataProxy(allData: AllCodeTaskData, workflow: Workflow, itemIndex: number) {
private createDataProxy(data: DataRequestResponse, workflow: Workflow, itemIndex: number) {
return new WorkflowDataProxy(
workflow,
allData.runExecutionData,
allData.runIndex,
data.runExecutionData,
data.runIndex,
itemIndex,
allData.activeNodeName,
allData.connectionInputData,
allData.siblingParameters,
allData.mode,
data.activeNodeName,
data.connectionInputData,
data.siblingParameters,
data.mode,
getAdditionalKeys(
allData.additionalData as IWorkflowExecuteAdditionalData,
allData.mode,
allData.runExecutionData,
data.additionalData as IWorkflowExecuteAdditionalData,
data.mode,
data.runExecutionData,
),
allData.executeData,
allData.defaultReturnRunIndex,
allData.selfData,
allData.contextNodeName,
data.executeData,
data.defaultReturnRunIndex,
data.selfData,
data.contextNodeName,
// Make sure that even if we don't receive the envProviderState for
// whatever reason, we don't expose the task runner's env to the code
allData.envProviderState ?? {
data.envProviderState ?? {
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
},
).getDataProxy();
// Because we optimize the needed data, it can be partially available.
// We assign the available built-ins to the execution context, which
// means we run the getter for '$json', and by default $json throws
// if there is no data available.
).getDataProxy({ throwOnMissingExecutionData: false });
}
private toExecutionErrorIfNeeded(error: unknown): Error {

View file

@ -1,6 +1,11 @@
import type { INodeExecutionData, INodeTypeBaseDescription } from 'n8n-workflow';
export type DataRequestType = 'input' | 'node' | 'all';
export interface TaskDataRequestParams {
dataOfNodes: string[] | 'all';
prevNode: boolean;
input: boolean;
env: boolean;
}
export interface TaskResultData {
result: INodeExecutionData[];
@ -89,8 +94,7 @@ export namespace N8nMessage {
type: 'broker:taskdatarequest';
taskId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
requestParams: TaskDataRequestParams;
}
export interface RPC {
@ -186,8 +190,7 @@ export namespace RunnerMessage {
type: 'runner:taskdatarequest';
taskId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
requestParams: TaskDataRequestParams;
}
export interface RPC {

View file

@ -288,8 +288,7 @@ export abstract class TaskRunner {
async requestData<T = unknown>(
taskId: Task['taskId'],
type: RunnerMessage.ToN8n.TaskDataRequest['requestType'],
param?: string,
requestParams: RunnerMessage.ToN8n.TaskDataRequest['requestParams'],
): Promise<T> {
const requestId = nanoid();
@ -305,8 +304,7 @@ export abstract class TaskRunner {
type: 'runner:taskdatarequest',
taskId,
requestId,
requestType: type,
param,
requestParams,
});
try {

View file

@ -494,15 +494,18 @@ describe('TaskBroker', () => {
const taskId = 'task1';
const requesterId = 'requester1';
const requestId = 'request1';
const requestType = 'input';
const param = 'test_param';
const requestParams: RunnerMessage.ToN8n.TaskDataRequest['requestParams'] = {
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
};
const message: RunnerMessage.ToN8n.TaskDataRequest = {
type: 'runner:taskdatarequest',
taskId,
requestId,
requestType,
param,
requestParams,
};
const requesterMessageCallback = jest.fn();
@ -519,8 +522,7 @@ describe('TaskBroker', () => {
type: 'broker:taskdatarequest',
taskId,
requestId,
requestType,
param,
requestParams,
});
});

View file

@ -32,10 +32,10 @@ describe('TaskRunnerProcess', () => {
});
describe('constructor', () => {
it('should throw if runner mode is external', () => {
it('should not throw if runner mode is external', () => {
runnerConfig.mode = 'external';
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).toThrow();
expect(() => new TaskRunnerProcess(logger, runnerConfig, authService)).not.toThrow();
runnerConfig.mode = 'internal_childprocess';
});

View file

@ -5,7 +5,17 @@ import type WebSocket from 'ws';
import type { TaskRunner } from './task-broker.service';
import type { AuthlessRequest } from '../requests';
export type DataRequestType = 'input' | 'node' | 'all';
/**
* Specifies what data should be included for a task data request.
*/
export interface TaskDataRequestParams {
dataOfNodes: string[] | 'all';
prevNode: boolean;
/** Whether input data for the node should be included */
input: boolean;
/** Whether env provider's state should be included */
env: boolean;
}
export interface TaskResultData {
result: INodeExecutionData[];
@ -101,8 +111,7 @@ export namespace N8nMessage {
type: 'broker:taskdatarequest';
taskId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
requestParams: TaskDataRequestParams;
}
export interface RPC {
@ -198,8 +207,7 @@ export namespace RunnerMessage {
type: 'runner:taskdatarequest';
taskId: string;
requestId: string;
requestType: DataRequestType;
param?: string;
requestParams: TaskDataRequestParams;
}
export interface RPC {

View file

@ -178,12 +178,7 @@ export class TaskBroker {
await this.taskErrorHandler(message.taskId, message.error);
break;
case 'runner:taskdatarequest':
await this.handleDataRequest(
message.taskId,
message.requestId,
message.requestType,
message.param,
);
await this.handleDataRequest(message.taskId, message.requestId, message.requestParams);
break;
case 'runner:rpc':
@ -233,8 +228,7 @@ export class TaskBroker {
async handleDataRequest(
taskId: Task['id'],
requestId: RunnerMessage.ToN8n.TaskDataRequest['requestId'],
requestType: RunnerMessage.ToN8n.TaskDataRequest['requestType'],
param?: string,
requestParams: RunnerMessage.ToN8n.TaskDataRequest['requestParams'],
) {
const task = this.tasks.get(taskId);
if (!task) {
@ -244,8 +238,7 @@ export class TaskBroker {
type: 'broker:taskdatarequest',
taskId,
requestId,
requestType,
param,
requestParams,
});
}

View file

@ -0,0 +1,324 @@
import { mock } from 'jest-mock-extended';
import type { IExecuteFunctions, IWorkflowExecuteAdditionalData } from 'n8n-workflow';
import { type INode, type INodeExecutionData, type Workflow } from 'n8n-workflow';
import { DataRequestResponseBuilder } from '../data-request-response-builder';
import type { TaskData } from '../task-manager';
const triggerNode: INode = mock<INode>({
name: 'Trigger',
});
const debugHelperNode: INode = mock<INode>({
name: 'DebugHelper',
});
const codeNode: INode = mock<INode>({
name: 'Code',
});
const workflow: TaskData['workflow'] = mock<Workflow>();
const debugHelperNodeOutItems: INodeExecutionData[] = [
{
json: {
uid: 'abb74fd4-bef2-4fae-9d53-ea24e9eb3032',
email: 'Dan.Schmidt31@yahoo.com',
firstname: 'Toni',
lastname: 'Schuster',
password: 'Q!D6C2',
},
pairedItem: {
item: 0,
},
},
];
const codeNodeInputItems: INodeExecutionData[] = debugHelperNodeOutItems;
const connectionInputData: TaskData['connectionInputData'] = codeNodeInputItems;
const envProviderState: TaskData['envProviderState'] = mock<TaskData['envProviderState']>({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
const additionalData = mock<IWorkflowExecuteAdditionalData>({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest',
variables: {},
webhookBaseUrl: 'http://localhost:5678/webhook',
webhookTestBaseUrl: 'http://localhost:5678/webhook-test',
webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting',
executionId: '45844',
userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5',
currentNodeParameters: undefined,
executionTimeoutTimestamp: undefined,
restartExecutionId: undefined,
});
const executeFunctions = mock<IExecuteFunctions>();
/**
* Drawn with https://asciiflow.com/#/
* Task data for an execution of the following WF:
* where denotes the currently being executing node.
*
*
* Trigger DebugHelper Code
*
*/
const taskData: TaskData = {
executeFunctions,
workflow,
connectionInputData,
inputData: {
main: [codeNodeInputItems],
},
itemIndex: 0,
activeNodeName: codeNode.name,
contextNodeName: codeNode.name,
defaultReturnRunIndex: -1,
mode: 'manual',
envProviderState,
node: codeNode,
runExecutionData: {
startData: {
destinationNode: codeNode.name,
runNodeFilter: [triggerNode.name, debugHelperNode.name, codeNode.name],
},
resultData: {
runData: {
[triggerNode.name]: [
{
hints: [],
startTime: 1730313407328,
executionTime: 1,
source: [],
executionStatus: 'success',
data: {
main: [[]],
},
},
],
[debugHelperNode.name]: [
{
hints: [],
startTime: 1730313407330,
executionTime: 1,
source: [
{
previousNode: triggerNode.name,
},
],
executionStatus: 'success',
data: {
main: [debugHelperNodeOutItems],
},
},
],
},
pinData: {},
},
executionData: {
contextData: {},
nodeExecutionStack: [],
metadata: {},
waitingExecution: {
[codeNode.name]: {
'0': {
main: [codeNodeInputItems],
},
},
},
waitingExecutionSource: {
[codeNode.name]: {
'0': {
main: [
{
previousNode: debugHelperNode.name,
},
],
},
},
},
},
},
runIndex: 0,
selfData: {},
siblingParameters: {},
executeData: {
node: codeNode,
data: {
main: [codeNodeInputItems],
},
source: {
main: [
{
previousNode: debugHelperNode.name,
previousNodeOutput: 0,
},
],
},
},
additionalData,
} as const;
describe('DataRequestResponseBuilder', () => {
const allDataParam: DataRequestResponseBuilder['requestParams'] = {
dataOfNodes: 'all',
env: true,
input: true,
prevNode: true,
};
const newRequestParam = (opts: Partial<DataRequestResponseBuilder['requestParams']>) => ({
...allDataParam,
...opts,
});
describe('all data', () => {
it('should build the runExecutionData as is when everything is requested', () => {
const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam);
const { runExecutionData } = dataRequestResponseBuilder.build();
expect(runExecutionData).toStrictEqual(taskData.runExecutionData);
});
});
describe('envProviderState', () => {
it("should filter out envProviderState when it's not requested", () => {
const dataRequestResponseBuilder = new DataRequestResponseBuilder(
taskData,
newRequestParam({
env: false,
}),
);
const result = dataRequestResponseBuilder.build();
expect(result.envProviderState).toStrictEqual({
env: {},
isEnvAccessBlocked: false,
isProcessAvailable: true,
});
});
});
describe('additionalData', () => {
it('picks only specific properties for additional data', () => {
const dataRequestResponseBuilder = new DataRequestResponseBuilder(taskData, allDataParam);
const result = dataRequestResponseBuilder.build();
expect(result.additionalData).toStrictEqual({
formWaitingBaseUrl: 'http://localhost:5678/form-waiting',
instanceBaseUrl: 'http://localhost:5678/',
restApiUrl: 'http://localhost:5678/rest',
webhookBaseUrl: 'http://localhost:5678/webhook',
webhookTestBaseUrl: 'http://localhost:5678/webhook-test',
webhookWaitingBaseUrl: 'http://localhost:5678/webhook-waiting',
executionId: '45844',
userId: '114984bc-44b3-4dd4-9b54-a4a8d34d51d5',
currentNodeParameters: undefined,
executionTimeoutTimestamp: undefined,
restartExecutionId: undefined,
variables: additionalData.variables,
});
});
});
describe('input data', () => {
const allExceptInputParam = newRequestParam({
input: false,
});
it('drops input data from executeData', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.executeData).toStrictEqual({
node: taskData.executeData!.node,
source: taskData.executeData!.source,
data: {},
});
});
it('drops input data from result', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.inputData).toStrictEqual({});
});
it('drops input data from result', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.inputData).toStrictEqual({});
});
it('drops input data from connectionInputData', () => {
const result = new DataRequestResponseBuilder(taskData, allExceptInputParam).build();
expect(result.connectionInputData).toStrictEqual([]);
});
});
describe('nodes', () => {
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: ['Code'], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it('should return empty run data when only Code node is requested', () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [codeNode.name], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return only DebugHelper's data when only DebugHelper node is requested", () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [debugHelperNode.name], prevNode: false }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
it("should return DebugHelper's data when only prevNode node is requested", () => {
const result = new DataRequestResponseBuilder(
taskData,
newRequestParam({ dataOfNodes: [], prevNode: true }),
).build();
expect(result.runExecutionData.resultData.runData).toStrictEqual({
[debugHelperNode.name]: taskData.runExecutionData.resultData.runData[debugHelperNode.name],
});
expect(result.runExecutionData.resultData.pinData).toStrictEqual({});
// executionData & startData contain only metadata --> returned as is
expect(result.runExecutionData.startData).toStrictEqual(taskData.runExecutionData.startData);
expect(result.runExecutionData.executionData).toStrictEqual(
taskData.runExecutionData.executionData,
);
});
});
});

View file

@ -0,0 +1,205 @@
import type {
EnvProviderState,
IExecuteData,
INodeExecutionData,
IPinData,
IRunData,
IRunExecutionData,
ITaskDataConnections,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowParameters,
} from 'n8n-workflow';
import type { DataRequestResponse, PartialAdditionalData, TaskData } from './task-manager';
import type { N8nMessage } from '../runner-types';
/**
* Builds the response to a data request coming from a Task Runner. Tries to minimize
* the amount of data that is sent to the runner by only providing what is requested.
*/
export class DataRequestResponseBuilder {
private requestedNodeNames = new Set<string>();
constructor(
private readonly taskData: TaskData,
private readonly requestParams: N8nMessage.ToRequester.TaskDataRequest['requestParams'],
) {
this.requestedNodeNames = new Set(requestParams.dataOfNodes);
if (this.requestParams.prevNode && this.requestParams.dataOfNodes !== 'all') {
this.requestedNodeNames.add(this.determinePrevNodeName());
}
}
/**
* Builds a response to the data request
*/
build(): DataRequestResponse {
const { taskData: td } = this;
return {
workflow: this.buildWorkflow(td.workflow),
connectionInputData: this.buildConnectionInputData(td.connectionInputData),
inputData: this.buildInputData(td.inputData),
itemIndex: td.itemIndex,
activeNodeName: td.activeNodeName,
contextNodeName: td.contextNodeName,
defaultReturnRunIndex: td.defaultReturnRunIndex,
mode: td.mode,
envProviderState: this.buildEnvProviderState(td.envProviderState),
node: td.node, // The current node being executed
runExecutionData: this.buildRunExecutionData(td.runExecutionData),
runIndex: td.runIndex,
selfData: td.selfData,
siblingParameters: td.siblingParameters,
executeData: this.buildExecuteData(td.executeData),
additionalData: this.buildAdditionalData(td.additionalData),
};
}
private buildAdditionalData(
additionalData: IWorkflowExecuteAdditionalData,
): PartialAdditionalData {
return {
formWaitingBaseUrl: additionalData.formWaitingBaseUrl,
instanceBaseUrl: additionalData.instanceBaseUrl,
restApiUrl: additionalData.restApiUrl,
variables: additionalData.variables,
webhookBaseUrl: additionalData.webhookBaseUrl,
webhookTestBaseUrl: additionalData.webhookTestBaseUrl,
webhookWaitingBaseUrl: additionalData.webhookWaitingBaseUrl,
currentNodeParameters: additionalData.currentNodeParameters,
executionId: additionalData.executionId,
executionTimeoutTimestamp: additionalData.executionTimeoutTimestamp,
restartExecutionId: additionalData.restartExecutionId,
userId: additionalData.userId,
};
}
private buildExecuteData(executeData: IExecuteData | undefined): IExecuteData | undefined {
if (executeData === undefined) {
return undefined;
}
return {
node: executeData.node, // The current node being executed
data: this.requestParams.input ? executeData.data : {},
source: executeData.source,
};
}
private buildRunExecutionData(runExecutionData: IRunExecutionData): IRunExecutionData {
if (this.requestParams.dataOfNodes === 'all') {
return runExecutionData;
}
return {
startData: runExecutionData.startData,
resultData: {
error: runExecutionData.resultData.error,
lastNodeExecuted: runExecutionData.resultData.lastNodeExecuted,
metadata: runExecutionData.resultData.metadata,
runData: this.buildRunData(runExecutionData.resultData.runData),
pinData: this.buildPinData(runExecutionData.resultData.pinData),
},
executionData: runExecutionData.executionData
? {
// TODO: Figure out what these two are and can they be filtered
contextData: runExecutionData.executionData?.contextData,
nodeExecutionStack: runExecutionData.executionData.nodeExecutionStack,
metadata: runExecutionData.executionData.metadata,
waitingExecution: runExecutionData.executionData.waitingExecution,
waitingExecutionSource: runExecutionData.executionData.waitingExecutionSource,
}
: undefined,
};
}
private buildRunData(runData: IRunData): IRunData {
return this.filterObjectByNodeNames(runData);
}
private buildPinData(pinData: IPinData | undefined): IPinData | undefined {
return pinData ? this.filterObjectByNodeNames(pinData) : undefined;
}
private buildEnvProviderState(envProviderState: EnvProviderState): EnvProviderState {
if (this.requestParams.env) {
// In case `isEnvAccessBlocked` = true, the provider state has already sanitized
// the environment variables and we can return it as is.
return envProviderState;
}
return {
env: {},
isEnvAccessBlocked: envProviderState.isEnvAccessBlocked,
isProcessAvailable: envProviderState.isProcessAvailable,
};
}
private buildInputData(inputData: ITaskDataConnections): ITaskDataConnections {
if (this.requestParams.input) {
return inputData;
}
return {};
}
private buildConnectionInputData(
connectionInputData: INodeExecutionData[],
): INodeExecutionData[] {
if (this.requestParams.input) {
return connectionInputData;
}
return [];
}
private buildWorkflow(workflow: Workflow): Omit<WorkflowParameters, 'nodeTypes'> {
return {
id: workflow.id,
name: workflow.name,
active: workflow.active,
connections: workflow.connectionsBySourceNode,
nodes: Object.values(workflow.nodes),
pinData: workflow.pinData,
settings: workflow.settings,
staticData: workflow.staticData,
};
}
/**
* Assuming the given `obj` is an object where the keys are node names,
* filters the object to only include the node names that are requested.
*/
private filterObjectByNodeNames<T extends Record<string, unknown>>(obj: T): T {
if (this.requestParams.dataOfNodes === 'all') {
return obj;
}
const filteredObj: T = {} as T;
for (const nodeName in obj) {
if (!Object.prototype.hasOwnProperty.call(obj, nodeName)) {
continue;
}
if (this.requestedNodeNames.has(nodeName)) {
filteredObj[nodeName] = obj[nodeName];
}
}
return filteredObj;
}
private determinePrevNodeName(): string {
const sourceData = this.taskData.executeData?.source?.main?.[0];
if (!sourceData) {
return '';
}
return sourceData.previousNode;
}
}

View file

@ -18,6 +18,7 @@ import {
} from 'n8n-workflow';
import { nanoid } from 'nanoid';
import { DataRequestResponseBuilder } from './data-request-response-builder';
import {
RPC_ALLOW_LIST,
type TaskResultData,
@ -67,7 +68,7 @@ export interface PartialAdditionalData {
variables: IDataObject;
}
export interface AllCodeTaskData {
export interface DataRequestResponse {
workflow: Omit<WorkflowParameters, 'nodeTypes'>;
inputData: ITaskDataConnections;
node: INode;
@ -104,19 +105,6 @@ interface ExecuteFunctionObject {
[name: string]: ((...args: unknown[]) => unknown) | ExecuteFunctionObject;
}
const workflowToParameters = (workflow: Workflow): Omit<WorkflowParameters, 'nodeTypes'> => {
return {
id: workflow.id,
name: workflow.name,
active: workflow.active,
connections: workflow.connectionsBySourceNode,
nodes: Object.values(workflow.nodes),
pinData: workflow.pinData,
settings: workflow.settings,
staticData: workflow.staticData,
};
};
export class TaskManager {
requestAcceptRejects: Map<string, { accept: RequestAccept; reject: RequestReject }> = new Map();
@ -245,7 +233,7 @@ export class TaskManager {
this.taskError(message.taskId, message.error);
break;
case 'broker:taskdatarequest':
this.sendTaskData(message.taskId, message.requestId, message.requestType);
this.sendTaskData(message.taskId, message.requestId, message.requestParams);
break;
case 'broker:rpc':
void this.handleRpc(message.taskId, message.callId, message.name, message.params);
@ -294,55 +282,24 @@ export class TaskManager {
sendTaskData(
taskId: string,
requestId: string,
requestType: N8nMessage.ToRequester.TaskDataRequest['requestType'],
requestParams: N8nMessage.ToRequester.TaskDataRequest['requestParams'],
) {
const job = this.tasks.get(taskId);
if (!job) {
// TODO: logging
return;
}
if (requestType === 'all') {
const jd = job.data;
const ad = jd.additionalData;
const data: AllCodeTaskData = {
workflow: workflowToParameters(jd.workflow),
connectionInputData: jd.connectionInputData,
inputData: jd.inputData,
itemIndex: jd.itemIndex,
activeNodeName: jd.activeNodeName,
contextNodeName: jd.contextNodeName,
defaultReturnRunIndex: jd.defaultReturnRunIndex,
mode: jd.mode,
envProviderState: jd.envProviderState,
node: jd.node,
runExecutionData: jd.runExecutionData,
runIndex: jd.runIndex,
selfData: jd.selfData,
siblingParameters: jd.siblingParameters,
executeData: jd.executeData,
additionalData: {
formWaitingBaseUrl: ad.formWaitingBaseUrl,
instanceBaseUrl: ad.instanceBaseUrl,
restApiUrl: ad.restApiUrl,
variables: ad.variables,
webhookBaseUrl: ad.webhookBaseUrl,
webhookTestBaseUrl: ad.webhookTestBaseUrl,
webhookWaitingBaseUrl: ad.webhookWaitingBaseUrl,
currentNodeParameters: ad.currentNodeParameters,
executionId: ad.executionId,
executionTimeoutTimestamp: ad.executionTimeoutTimestamp,
restartExecutionId: ad.restartExecutionId,
userId: ad.userId,
},
};
const dataRequestResponseBuilder = new DataRequestResponseBuilder(job.data, requestParams);
const requestedData = dataRequestResponseBuilder.build();
this.sendMessage({
type: 'requester:taskdataresponse',
taskId,
requestId,
data,
data: requestedData,
});
}
}
async handleRpc(
taskId: string,

View file

@ -68,15 +68,14 @@ export class TaskRunnerProcess extends TypedEmitter<TaskRunnerProcessEventMap> {
) {
super();
a.ok(
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher',
);
this.logger = logger.scoped('task-runner');
}
async start() {
a.ok(
this.runnerConfig.mode === 'internal_childprocess' ||
this.runnerConfig.mode === 'internal_launcher',
);
a.ok(!this.process, 'Task Runner Process already running');
const grantToken = await this.authService.createGrantToken();

View file

@ -388,8 +388,13 @@ export class WorkflowDataProxy {
* @private
* @param {string} nodeName The name of the node query data from
* @param {boolean} [shortSyntax=false] If short syntax got used
* @param {boolean} [throwOnMissingExecutionData=true] If an error should get thrown if no execution data is available
*/
private nodeDataGetter(nodeName: string, shortSyntax = false) {
private nodeDataGetter(
nodeName: string,
shortSyntax = false,
throwOnMissingExecutionData = true,
) {
const that = this;
const node = this.workflow.nodes[nodeName];
@ -416,6 +421,10 @@ export class WorkflowDataProxy {
shortSyntax,
});
if (executionData.length === 0 && !throwOnMissingExecutionData) {
return undefined;
}
if (executionData.length === 0) {
if (that.workflow.getParentNodes(nodeName).length === 0) {
throw new ExpressionError('No execution data available', {
@ -613,7 +622,7 @@ export class WorkflowDataProxy {
* Returns the data proxy object which allows to query data from current run
*
*/
getDataProxy(): IWorkflowDataProxyData {
getDataProxy(opts?: { throwOnMissingExecutionData: boolean }): IWorkflowDataProxyData {
const that = this;
// replacing proxies with the actual data.
@ -1367,6 +1376,7 @@ export class WorkflowDataProxy {
$nodeId: that.workflow.getNode(that.activeNodeName)?.id,
$webhookId: that.workflow.getNode(that.activeNodeName)?.webhookId,
};
const throwOnMissingExecutionData = opts?.throwOnMissingExecutionData ?? true;
return new Proxy(base, {
has: () => true,
@ -1374,10 +1384,11 @@ export class WorkflowDataProxy {
if (name === 'isProxy') return true;
if (['$data', '$json'].includes(name as string)) {
return that.nodeDataGetter(that.contextNodeName, true)?.json;
return that.nodeDataGetter(that.contextNodeName, true, throwOnMissingExecutionData)?.json;
}
if (name === '$binary') {
return that.nodeDataGetter(that.contextNodeName, true)?.binary;
return that.nodeDataGetter(that.contextNodeName, true, throwOnMissingExecutionData)
?.binary;
}
return Reflect.get(target, name, receiver);

View file

@ -26,6 +26,7 @@ const getProxyFromFixture = (
run: IRun | null,
activeNode: string,
mode?: WorkflowExecuteMode,
opts?: { throwOnMissingExecutionData: boolean },
) => {
const taskData = run?.data.resultData.runData[activeNode]?.[0];
const lastNodeConnectionInputData = taskData?.data?.main[0];
@ -73,7 +74,7 @@ const getProxyFromFixture = (
executeData,
);
return dataProxy.getDataProxy();
return dataProxy.getDataProxy(opts);
};
describe('WorkflowDataProxy', () => {
@ -404,4 +405,42 @@ describe('WorkflowDataProxy', () => {
expect(proxy.$node.PinnedSet.json.firstName).toBe('Joe');
});
});
describe('Partial data', () => {
const fixture = loadFixture('partial_data');
describe('Default behaviour (throw on missing execution data)', () => {
const proxy = getProxyFromFixture(fixture.workflow, fixture.run, 'End');
test('$binary', () => {
expect(() => proxy.$binary).toThrowError(ExpressionError);
});
test('$json', () => {
expect(() => proxy.$json).toThrowError(ExpressionError);
});
test('$data', () => {
expect(() => proxy.$data).toThrowError(ExpressionError);
});
});
describe("Don't throw on missing execution data)", () => {
const proxy = getProxyFromFixture(fixture.workflow, fixture.run, 'End', undefined, {
throwOnMissingExecutionData: false,
});
test('$binary', () => {
expect(proxy.$binary).toBeUndefined();
});
test('$json', () => {
expect(proxy.$json).toBeUndefined();
});
test('$data', () => {
expect(proxy.$data).toBeUndefined();
});
});
});
});

View file

@ -0,0 +1,71 @@
{
"data": {
"startData": {},
"resultData": {
"runData": {
"Start": [
{
"startTime": 1,
"executionTime": 1,
"data": {
"main": [
[
{
"json": {}
}
]
]
},
"source": []
}
],
"Function": [
{
"startTime": 1,
"executionTime": 1,
"data": {
"main": [[]]
},
"source": [
{
"previousNode": "Start"
}
]
}
],
"Rename": [
{
"startTime": 1,
"executionTime": 1,
"data": {
"main": [[]]
},
"source": [
{
"previousNode": "Function"
}
]
}
],
"End": [
{
"startTime": 1,
"executionTime": 1,
"data": {
"main": [[]]
},
"source": [
{
"previousNode": "Rename"
}
]
}
]
}
}
},
"mode": "manual",
"startedAt": "2024-02-08T15:45:18.848Z",
"stoppedAt": "2024-02-08T15:45:18.862Z",
"status": "running"
}

View file

@ -0,0 +1,86 @@
{
"name": "",
"nodes": [
{
"name": "Start",
"type": "test.set",
"parameters": {},
"typeVersion": 1,
"id": "uuid-1",
"position": [100, 200]
},
{
"name": "Function",
"type": "test.set",
"parameters": {
"functionCode": "// Code here will run only once, no matter how many input items there are.\n// More info and help: https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.function/\nconst { DateTime, Duration, Interval } = require(\"luxon\");\n\nconst data = [\n {\n \"length\": 105\n },\n {\n \"length\": 160\n },\n {\n \"length\": 121\n },\n {\n \"length\": 275\n },\n {\n \"length\": 950\n },\n];\n\nreturn data.map(fact => ({json: fact}));"
},
"typeVersion": 1,
"id": "uuid-2",
"position": [280, 200]
},
{
"name": "Rename",
"type": "test.set",
"parameters": {
"value1": "data",
"value2": "initialName"
},
"typeVersion": 1,
"id": "uuid-3",
"position": [460, 200]
},
{
"name": "Set",
"type": "test.set",
"parameters": {},
"typeVersion": 1,
"id": "uuid-4",
"position": [640, 200]
},
{
"name": "End",
"type": "test.set",
"parameters": {},
"typeVersion": 1,
"id": "uuid-5",
"position": [640, 200]
}
],
"pinData": {},
"connections": {
"Start": {
"main": [
[
{
"node": "Function",
"type": "main",
"index": 0
}
]
]
},
"Function": {
"main": [
[
{
"node": "Rename",
"type": "main",
"index": 0
}
]
]
},
"Rename": {
"main": [
[
{
"node": "End",
"type": "main",
"index": 0
}
]
]
}
}
}

View file

@ -645,6 +645,12 @@ importers:
'@n8n/config':
specifier: workspace:*
version: link:../config
acorn:
specifier: 8.14.0
version: 8.14.0
acorn-walk:
specifier: 8.3.4
version: 8.3.4
n8n-core:
specifier: workspace:*
version: link:../../core
@ -1090,7 +1096,7 @@ importers:
dependencies:
'@langchain/core':
specifier: 'catalog:'
version: 0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0(zod@3.23.8))
version: 0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0(encoding@0.1.13)(zod@3.23.8))
'@n8n/client-oauth2':
specifier: workspace:*
version: link:../@n8n/client-oauth2
@ -1921,7 +1927,7 @@ importers:
devDependencies:
'@langchain/core':
specifier: 'catalog:'
version: 0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0)
version: 0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0(encoding@0.1.13)(zod@3.23.8))
'@types/deep-equal':
specifier: ^1.0.1
version: 1.0.1
@ -2227,7 +2233,7 @@ packages:
'@azure/core-http@3.0.4':
resolution: {integrity: sha512-Fok9VVhMdxAFOtqiiAtg74fL0UJkt0z3D+ouUUxcRLzZNBioPRAMJFVxiWoJljYpXsRi4GDQHzQHDc9AiYaIUQ==}
engines: {node: '>=14.0.0'}
deprecated: deprecating as we migrated to core v2
deprecated: This package is no longer supported. Please migrate to use @azure/core-rest-pipeline
'@azure/core-lro@2.4.0':
resolution: {integrity: sha512-F65+rYkll1dpw3RGm8/SSiSj+/QkMeYDanzS/QKlM1dmuneVyXbO46C88V1MRHluLGdMP6qfD3vDRYALn0z0tQ==}
@ -5475,10 +5481,6 @@ packages:
peerDependencies:
acorn: ^6.0.0 || ^7.0.0 || ^8.0.0
acorn-walk@8.3.2:
resolution: {integrity: sha512-cjkyv4OtNCIeqhHrfS81QWXoCBPExR/J62oyEqepVw8WaQeSqpW2uhuLPh1m9eWhDuOo/jUXVTlifvesOWp/4A==}
engines: {node: '>=0.4.0'}
acorn-walk@8.3.4:
resolution: {integrity: sha512-ueEepnujpqee2o5aIYnvHU6C0A42MNdsIDeqy5BydrkuC5R1ZuUFnm27EeFJGoEHJQgn3uleRvmTXaJgfXbt4g==}
engines: {node: '>=0.4.0'}
@ -5493,6 +5495,11 @@ packages:
engines: {node: '>=0.4.0'}
hasBin: true
acorn@8.14.0:
resolution: {integrity: sha512-cl669nCJTZBsL97OF4kUQm5g5hC2uihk0NxY3WENAC0TYdILVkAyHymAntgxGkl7K+t0cXIrH5siy5S4XkFycA==}
engines: {node: '>=0.4.0'}
hasBin: true
adm-zip@0.5.10:
resolution: {integrity: sha512-x0HvcHqVJNTPk/Bw8JbLWlWoo6Wwnsug0fnYYro1HBrjxZ3G7/AZk7Ahv8JwDe1uIcz8eBqvu86FuF1POiG7vQ==}
engines: {node: '>=6.0'}
@ -14689,38 +14696,6 @@ snapshots:
transitivePeerDependencies:
- openai
'@langchain/core@0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0(zod@3.23.8))':
dependencies:
ansi-styles: 5.2.0
camelcase: 6.3.0
decamelize: 1.2.0
js-tiktoken: 1.0.12
langsmith: 0.1.59(openai@4.63.0(zod@3.23.8))
mustache: 4.2.0
p-queue: 6.6.2
p-retry: 4.6.2
uuid: 10.0.0
zod: 3.23.8
zod-to-json-schema: 3.23.3(zod@3.23.8)
transitivePeerDependencies:
- openai
'@langchain/core@0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0)':
dependencies:
ansi-styles: 5.2.0
camelcase: 6.3.0
decamelize: 1.2.0
js-tiktoken: 1.0.12
langsmith: 0.1.59(openai@4.63.0)
mustache: 4.2.0
p-queue: 6.6.2
p-retry: 4.6.2
uuid: 10.0.0
zod: 3.23.8
zod-to-json-schema: 3.23.3(zod@3.23.8)
transitivePeerDependencies:
- openai
'@langchain/google-common@0.1.1(@langchain/core@0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0(encoding@0.1.13)(zod@3.23.8)))(zod@3.23.8)':
dependencies:
'@langchain/core': 0.3.3(patch_hash=ekay3bw7hexufl733lypqvmx2e)(openai@4.63.0(encoding@0.1.13)(zod@3.23.8))
@ -15031,7 +15006,7 @@ snapshots:
'@n8n/vm2@3.9.25':
dependencies:
acorn: 8.12.1
acorn-walk: 8.3.2
acorn-walk: 8.3.4
'@n8n_io/ai-assistant-sdk@1.10.3': {}
@ -17232,7 +17207,7 @@ snapshots:
'@vue/test-utils@2.4.6':
dependencies:
js-beautify: 1.14.9
vue-component-type-helpers: 2.1.6
vue-component-type-helpers: 2.1.8
'@vueuse/components@10.11.0(vue@3.5.11(typescript@5.6.2))':
dependencies:
@ -17306,24 +17281,23 @@ snapshots:
acorn-globals@7.0.1:
dependencies:
acorn: 8.12.1
acorn-walk: 8.3.2
acorn: 8.14.0
acorn-walk: 8.3.4
acorn-jsx@5.3.2(acorn@8.12.1):
acorn-jsx@5.3.2(acorn@8.14.0):
dependencies:
acorn: 8.12.1
acorn-walk@8.3.2: {}
acorn: 8.14.0
acorn-walk@8.3.4:
dependencies:
acorn: 8.12.1
optional: true
acorn: 8.14.0
acorn@7.4.1: {}
acorn@8.12.1: {}
acorn@8.14.0: {}
adm-zip@0.5.10: {}
agent-base@6.0.2:
@ -19316,7 +19290,7 @@ snapshots:
eslint-import-resolver-node@0.3.9:
dependencies:
debug: 3.2.7(supports-color@5.5.0)
debug: 3.2.7(supports-color@8.1.1)
is-core-module: 2.13.1
resolve: 1.22.8
transitivePeerDependencies:
@ -19341,7 +19315,7 @@ snapshots:
eslint-module-utils@2.8.0(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@7.2.0(eslint@8.57.0)(typescript@5.6.2))(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0):
dependencies:
debug: 3.2.7(supports-color@5.5.0)
debug: 3.2.7(supports-color@8.1.1)
optionalDependencies:
'@typescript-eslint/parser': 7.2.0(eslint@8.57.0)(typescript@5.6.2)
eslint: 8.57.0
@ -19361,7 +19335,7 @@ snapshots:
array.prototype.findlastindex: 1.2.3
array.prototype.flat: 1.3.2
array.prototype.flatmap: 1.3.2
debug: 3.2.7(supports-color@5.5.0)
debug: 3.2.7(supports-color@8.1.1)
doctrine: 2.1.0
eslint: 8.57.0
eslint-import-resolver-node: 0.3.9
@ -19504,8 +19478,8 @@ snapshots:
espree@9.6.1:
dependencies:
acorn: 8.12.1
acorn-jsx: 5.3.2(acorn@8.12.1)
acorn: 8.14.0
acorn-jsx: 5.3.2(acorn@8.14.0)
eslint-visitor-keys: 3.4.3
esprima-next@5.8.4: {}
@ -20159,7 +20133,7 @@ snapshots:
array-parallel: 0.1.3
array-series: 0.1.5
cross-spawn: 4.0.2
debug: 3.2.7(supports-color@5.5.0)
debug: 3.2.7(supports-color@8.1.1)
transitivePeerDependencies:
- supports-color
@ -21498,28 +21472,6 @@ snapshots:
optionalDependencies:
openai: 4.63.0(encoding@0.1.13)(zod@3.23.8)
langsmith@0.1.59(openai@4.63.0(zod@3.23.8)):
dependencies:
'@types/uuid': 10.0.0
commander: 10.0.1
p-queue: 6.6.2
p-retry: 4.6.2
semver: 7.6.0
uuid: 10.0.0
optionalDependencies:
openai: 4.63.0(zod@3.23.8)
langsmith@0.1.59(openai@4.63.0):
dependencies:
'@types/uuid': 10.0.0
commander: 10.0.1
p-queue: 6.6.2
p-retry: 4.6.2
semver: 7.6.0
uuid: 10.0.0
optionalDependencies:
openai: 4.63.0(zod@3.23.8)
lazy-ass@1.6.0: {}
ldapts@4.2.6:
@ -22352,14 +22304,14 @@ snapshots:
mlly@1.4.2:
dependencies:
acorn: 8.12.1
acorn: 8.14.0
pathe: 1.1.2
pkg-types: 1.0.3
ufo: 1.3.2
mlly@1.7.1:
dependencies:
acorn: 8.12.1
acorn: 8.14.0
pathe: 1.1.2
pkg-types: 1.1.3
ufo: 1.5.4
@ -22864,22 +22816,6 @@ snapshots:
- encoding
- supports-color
openai@4.63.0(zod@3.23.8):
dependencies:
'@types/node': 18.16.16
'@types/node-fetch': 2.6.4
abort-controller: 3.0.0
agentkeepalive: 4.2.1
form-data-encoder: 1.7.2
formdata-node: 4.4.1
node-fetch: 2.7.0(encoding@0.1.13)
optionalDependencies:
zod: 3.23.8
transitivePeerDependencies:
- encoding
- supports-color
optional: true
openapi-sampler@1.5.1:
dependencies:
'@types/json-schema': 7.0.15
@ -23060,7 +22996,7 @@ snapshots:
pdf-parse@1.1.1:
dependencies:
debug: 3.2.7(supports-color@5.5.0)
debug: 3.2.7(supports-color@8.1.1)
node-ensure: 0.0.0
transitivePeerDependencies:
- supports-color
@ -23889,7 +23825,7 @@ snapshots:
rhea@1.0.24:
dependencies:
debug: 3.2.7(supports-color@5.5.0)
debug: 3.2.7(supports-color@8.1.1)
transitivePeerDependencies:
- supports-color
@ -24786,7 +24722,7 @@ snapshots:
terser@5.16.1:
dependencies:
'@jridgewell/source-map': 0.3.6
acorn: 8.12.1
acorn: 8.14.0
commander: 2.20.3
source-map-support: 0.5.21
optional: true
@ -24959,7 +24895,7 @@ snapshots:
'@tsconfig/node14': 1.0.3
'@tsconfig/node16': 1.0.4
'@types/node': 18.16.16
acorn: 8.12.1
acorn: 8.14.0
acorn-walk: 8.3.4
arg: 4.1.3
create-require: 1.1.1
@ -25253,14 +25189,14 @@ snapshots:
unplugin@1.0.1:
dependencies:
acorn: 8.12.1
acorn: 8.14.0
chokidar: 4.0.1
webpack-sources: 3.2.3
webpack-virtual-modules: 0.5.0
unplugin@1.11.0:
dependencies:
acorn: 8.12.1
acorn: 8.14.0
chokidar: 4.0.1
webpack-sources: 3.2.3
webpack-virtual-modules: 0.6.1